File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/dataflow/apis.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for interacting with the Cloud Dataflow API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import json
import os
import shutil
import stat
import textwrap
from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.cloudbuild import cloudbuild_util
from googlecloudsdk.api_lib.storage import storage_api
from googlecloudsdk.api_lib.storage import storage_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import exceptions
from googlecloudsdk.command_lib.builds import submit_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
import six
DATAFLOW_API_NAME = 'dataflow'
DATAFLOW_API_VERSION = 'v1b3'
# TODO(b/139889563): Remove when dataflow args region is changed to required
DATAFLOW_API_DEFAULT_REGION = 'us-central1'
def GetMessagesModule():
return apis.GetMessagesModule(DATAFLOW_API_NAME, DATAFLOW_API_VERSION)
def GetClientInstance():
return apis.GetClientInstance(DATAFLOW_API_NAME, DATAFLOW_API_VERSION)
def GetProject():
return properties.VALUES.core.project.Get(required=True)
def _GetBaseImagePath(image, is_distroless=False):
"""Returns full the image path of the given image."""
if not is_distroless:
return (
f'gcr.io/dataflow-templates-base/{image}-template-launcher-base:latest'
)
return f'gcr.io/dataflow-templates-base/{image}-template-launcher-base-distroless:latest'
class Jobs:
"""The Jobs set of Dataflow API functions."""
GET_REQUEST = GetMessagesModule().DataflowProjectsLocationsJobsGetRequest
LIST_REQUEST = GetMessagesModule().DataflowProjectsLocationsJobsListRequest
AGGREGATED_LIST_REQUEST = GetMessagesModule(
).DataflowProjectsJobsAggregatedRequest
UPDATE_REQUEST = GetMessagesModule(
).DataflowProjectsLocationsJobsUpdateRequest
@staticmethod
def GetService():
return GetClientInstance().projects_locations_jobs
@staticmethod
def Get(job_id, project_id=None, region_id=None, view=None):
"""Calls the Dataflow Jobs.Get method.
Args:
job_id: Identifies a single job.
project_id: The project which owns the job.
region_id: The regional endpoint where the job lives.
view: (DataflowProjectsJobsGetRequest.ViewValueValuesEnum) Level of
information requested in response.
Returns:
(Job)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule().DataflowProjectsLocationsJobsGetRequest(
jobId=job_id, location=region_id, projectId=project_id, view=view)
try:
return Jobs.GetService().Get(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def Cancel(job_id, force=False, project_id=None, region_id=None):
"""Cancels a job by calling the Jobs.Update method.
Args:
job_id: Identifies a single job.
force: True to forcibly cancel the job.
project_id: The project which owns the job.
region_id: The regional endpoint where the job lives.
Returns:
(Job)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
labels = None
if force:
labels = GetMessagesModule().Job.LabelsValue(additionalProperties=[
GetMessagesModule().Job.LabelsValue.AdditionalProperty(
key='force_cancel_job', value='true')
])
job = GetMessagesModule().Job(
labels=labels,
requestedState=(GetMessagesModule().Job.RequestedStateValueValuesEnum
.JOB_STATE_CANCELLED))
request = GetMessagesModule().DataflowProjectsLocationsJobsUpdateRequest(
jobId=job_id, location=region_id, projectId=project_id, job=job)
try:
return Jobs.GetService().Update(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def UpdateOptions(
job_id,
project_id=None,
region_id=None,
min_num_workers=None,
max_num_workers=None,
worker_utilization_hint=None,
unset_worker_utilization_hint=None,
):
"""Update pipeline options on a running job.
You should specify at-least one (or both) of min_num_workers and
max_num_workers.
Args:
job_id: ID of job to update
project_id: Project of the job
region_id: Region the job is in
min_num_workers: Lower-bound for worker autoscaling
max_num_workers: Upper-bound for worker autoscaling
worker_utilization_hint: Target CPU utilization for worker autoscaling
unset_worker_utilization_hint: Unsets worker_utilization_hint value
Returns:
The updated Job
"""
project_id = project_id or GetProject()
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
job = GetMessagesModule().Job(
runtimeUpdatableParams=GetMessagesModule().RuntimeUpdatableParams(
minNumWorkers=min_num_workers,
maxNumWorkers=max_num_workers,
workerUtilizationHint=(
None
if unset_worker_utilization_hint
else worker_utilization_hint
),
)
)
update_mask_pieces = []
if min_num_workers is not None:
update_mask_pieces.append('runtime_updatable_params.min_num_workers')
if max_num_workers is not None:
update_mask_pieces.append('runtime_updatable_params.max_num_workers')
if (
worker_utilization_hint is not None
or unset_worker_utilization_hint
):
update_mask_pieces.append(
'runtime_updatable_params.worker_utilization_hint'
)
update_mask = ','.join(update_mask_pieces)
request = GetMessagesModule().DataflowProjectsLocationsJobsUpdateRequest(
jobId=job_id,
location=region_id,
projectId=project_id,
job=job,
updateMask=update_mask,
)
try:
return Jobs.GetService().Update(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def Drain(job_id, project_id=None, region_id=None):
"""Drains a job by calling the Jobs.Update method.
Args:
job_id: Identifies a single job.
project_id: The project which owns the job.
region_id: The regional endpoint where the job lives.
Returns:
(Job)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
job = GetMessagesModule().Job(
requestedState=(GetMessagesModule().Job.RequestedStateValueValuesEnum
.JOB_STATE_DRAINED))
request = GetMessagesModule().DataflowProjectsLocationsJobsUpdateRequest(
jobId=job_id, location=region_id, projectId=project_id, job=job)
try:
return Jobs.GetService().Update(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def ResumeUnsupportedSDK(job_id,
experiment_with_token,
project_id=None,
region_id=None):
"""Resumes a job by calling the Jobs.Update method.
Args:
job_id: Identifies a single job.
experiment_with_token: The resume token unique to the job prefixed with
the experiment key.
project_id: The project which owns the job.
region_id: The regional endpoint where the job lives.
Returns:
(Job)
"""
project_id = project_id or GetProject()
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
environment = GetMessagesModule().Environment(
experiments=[experiment_with_token])
job = GetMessagesModule().Job(environment=environment)
request = GetMessagesModule().DataflowProjectsLocationsJobsUpdateRequest(
jobId=job_id, location=region_id, projectId=project_id, job=job)
try:
return Jobs.GetService().Update(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def Snapshot(job_id,
project_id=None,
region_id=None,
ttl='604800s',
snapshot_sources=False):
"""Takes a snapshot of a job via the Jobs.Snapshot method.
Args:
job_id: Identifies a single job.
project_id: The project which owns the job.
region_id: The regional endpoint where the job lives.
ttl: The ttl for the snapshot.
snapshot_sources: If true, the sources will be snapshotted.
Returns:
(Snapshot)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule().DataflowProjectsLocationsJobsSnapshotRequest(
jobId=job_id,
location=region_id,
projectId=project_id,
snapshotJobRequest=GetMessagesModule().SnapshotJobRequest(
location=region_id, ttl=ttl, snapshotSources=snapshot_sources),
)
try:
return Jobs.GetService().Snapshot(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
class Metrics:
"""The Metrics set of Dataflow API functions."""
GET_REQUEST = GetMessagesModule(
).DataflowProjectsLocationsJobsGetMetricsRequest
@staticmethod
def GetService():
return GetClientInstance().projects_locations_jobs
@staticmethod
def Get(job_id, project_id=None, region_id=None, start_time=None):
"""Calls the Dataflow Metrics.Get method.
Args:
job_id: The job to get messages for.
project_id: The project which owns the job.
region_id: The regional endpoint of the job.
start_time: Return only metric data that has changed since this time.
Default is to return all information about all metrics for the job.
Returns:
(MetricUpdate)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule(
).DataflowProjectsLocationsJobsGetMetricsRequest(
jobId=job_id,
location=region_id,
projectId=project_id,
startTime=start_time)
try:
return Metrics.GetService().GetMetrics(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
class TemplateArguments:
"""Wrapper class for template arguments."""
project_id = None
region_id = None
gcs_location = None
job_name = None
zone = None
max_workers = None
num_workers = None
network = None
subnetwork = None
worker_machine_type = None
launcher_machine_type = None
staging_location = None
temp_location = None
kms_key_name = None
disable_public_ips = None
parameters = None
service_account_email = None
worker_region = None
worker_zone = None
enable_streaming_engine = None
additional_experiments = None
additional_pipeline_options = None
additional_user_labels = None
streaming_update = None
transform_name_mappings = None
flexrs_goal = None
def __init__(self,
project_id=None,
region_id=None,
job_name=None,
gcs_location=None,
zone=None,
max_workers=None,
num_workers=None,
network=None,
subnetwork=None,
worker_machine_type=None,
launcher_machine_type=None,
staging_location=None,
temp_location=None,
kms_key_name=None,
disable_public_ips=None,
parameters=None,
service_account_email=None,
worker_region=None,
worker_zone=None,
enable_streaming_engine=None,
additional_experiments=None,
additional_pipeline_options=None,
additional_user_labels=None,
streaming_update=None,
transform_name_mappings=None,
flexrs_goal=None):
self.project_id = project_id
self.region_id = region_id
self.job_name = job_name
self.gcs_location = gcs_location
self.zone = zone
self.max_workers = max_workers
self.num_workers = num_workers
self.network = network
self.subnetwork = subnetwork
self.worker_machine_type = worker_machine_type
self.launcher_machine_type = launcher_machine_type
self.staging_location = staging_location
self.temp_location = temp_location
self.kms_key_name = kms_key_name
self.disable_public_ips = disable_public_ips
self.parameters = parameters
self.service_account_email = service_account_email
self.worker_region = worker_region
self.worker_zone = worker_zone
self.enable_streaming_engine = enable_streaming_engine
self.additional_experiments = additional_experiments
self.additional_pipeline_options = additional_pipeline_options
self.additional_user_labels = additional_user_labels
self.streaming_update = streaming_update
self.transform_name_mappings = transform_name_mappings
self.flexrs_goal = flexrs_goal
class Templates:
"""The Templates set of Dataflow API functions."""
CREATE_REQUEST = GetMessagesModule().CreateJobFromTemplateRequest
LAUNCH_TEMPLATE_PARAMETERS = GetMessagesModule().LaunchTemplateParameters
LAUNCH_TEMPLATE_PARAMETERS_VALUE = LAUNCH_TEMPLATE_PARAMETERS.ParametersValue
LAUNCH_FLEX_TEMPLATE_REQUEST = GetMessagesModule().LaunchFlexTemplateRequest
PARAMETERS_VALUE = CREATE_REQUEST.ParametersValue
FLEX_TEMPLATE_ENVIRONMENT = GetMessagesModule().FlexTemplateRuntimeEnvironment
FLEX_TEMPLATE_USER_LABELS_VALUE = (
FLEX_TEMPLATE_ENVIRONMENT.AdditionalUserLabelsValue
)
DYNAMIC_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE = (
LAUNCH_TEMPLATE_PARAMETERS.TransformNameMappingValue
)
FLEX_TEMPLATE_PARAMETER = GetMessagesModule().LaunchFlexTemplateParameter
FLEX_TEMPLATE_PARAMETERS_VALUE = FLEX_TEMPLATE_PARAMETER.ParametersValue
FLEX_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE = (
FLEX_TEMPLATE_PARAMETER.TransformNameMappingsValue
)
IP_CONFIGURATION_ENUM_VALUE = GetMessagesModule(
).FlexTemplateRuntimeEnvironment.IpConfigurationValueValuesEnum
FLEXRS_GOAL_ENUM_VALUE = GetMessagesModule(
).FlexTemplateRuntimeEnvironment.FlexrsGoalValueValuesEnum
TEMPLATE_METADATA = GetMessagesModule().TemplateMetadata
SDK_INFO = GetMessagesModule().SDKInfo
SDK_LANGUAGE = GetMessagesModule().SDKInfo.LanguageValueValuesEnum
CONTAINER_SPEC = GetMessagesModule().ContainerSpec
FLEX_TEMPLATE_JAVA11_BASE_IMAGE = _GetBaseImagePath('java11')
FLEX_TEMPLATE_JAVA17_BASE_IMAGE = _GetBaseImagePath('java17')
FLEX_TEMPLATE_JAVA21_BASE_IMAGE = _GetBaseImagePath('java21')
FLEX_TEMPLATE_JAVA11_DISTROLESS_BASE_IMAGE = _GetBaseImagePath('java11', True)
FLEX_TEMPLATE_JAVA17_DISTROLESS_BASE_IMAGE = _GetBaseImagePath('java17', True)
FLEX_TEMPLATE_JAVA21_DISTROLESS_BASE_IMAGE = _GetBaseImagePath('java21', True)
FLEX_TEMPLATE_PYTHON3_BASE_IMAGE = _GetBaseImagePath('python3')
FLEX_TEMPLATE_GO_BASE_IMAGE = _GetBaseImagePath('go')
FLEX_TEMPLATE_GO_DISTROLESS_BASE_IMAGE = _GetBaseImagePath('go', True)
YAML_TEMPLATE_GCS_LOCATION = (
'gs://dataflow-templates-{}/latest/flex/Yaml_Template'
)
ALL_PERMISSIONS_MASK = (
stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO
)
FILE_PERMISSIONS_MASK = (
stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
)
# Directories need +x for access.
DIR_PERMISSIONS_MASK = (
stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IXOTH | stat.S_IROTH
)
@staticmethod
def GetService():
return GetClientInstance().projects_locations_templates
@staticmethod
def GetFlexTemplateService():
return GetClientInstance().projects_locations_flexTemplates
@staticmethod
def Create(template_args=None):
"""Calls the Dataflow Templates.CreateFromJob method.
Args:
template_args: Arguments for create template.
Returns:
(Job)
"""
params_list = []
parameters = template_args.parameters
for k, v in six.iteritems(parameters) if parameters else {}:
params_list.append(
Templates.PARAMETERS_VALUE.AdditionalProperty(key=k, value=v))
# TODO(b/139889563): Remove default when args region is changed to required
region_id = template_args.region_id or DATAFLOW_API_DEFAULT_REGION
ip_configuration_enum = GetMessagesModule(
).RuntimeEnvironment.IpConfigurationValueValuesEnum
ip_private = ip_configuration_enum.WORKER_IP_PRIVATE
ip_configuration = ip_private if template_args.disable_public_ips else None
user_labels_value = GetMessagesModule(
).RuntimeEnvironment.AdditionalUserLabelsValue
user_labels_list = Templates.__ConvertDictArguments(
template_args.additional_user_labels,
user_labels_value)
body = Templates.CREATE_REQUEST(
gcsPath=template_args.gcs_location,
jobName=template_args.job_name,
location=region_id,
environment=GetMessagesModule().RuntimeEnvironment(
serviceAccountEmail=template_args.service_account_email,
zone=template_args.zone,
maxWorkers=template_args.max_workers,
numWorkers=template_args.num_workers,
network=template_args.network,
subnetwork=template_args.subnetwork,
machineType=template_args.worker_machine_type,
tempLocation=template_args.staging_location,
kmsKeyName=template_args.kms_key_name,
ipConfiguration=ip_configuration,
workerRegion=template_args.worker_region,
workerZone=template_args.worker_zone,
enableStreamingEngine=template_args.enable_streaming_engine,
additionalUserLabels=user_labels_value(
additionalProperties=user_labels_list)
if user_labels_list else None,
additionalExperiments=(
template_args.additional_experiments
if template_args.additional_experiments else
[]
),
additionalPipelineOptions=(
template_args.additional_pipeline_options
if template_args.additional_pipeline_options
else []
)),
parameters=Templates.PARAMETERS_VALUE(
additionalProperties=params_list) if parameters else None)
request = GetMessagesModule(
).DataflowProjectsLocationsTemplatesCreateRequest(
projectId=template_args.project_id or GetProject(),
location=region_id,
createJobFromTemplateRequest=body)
try:
return Templates.GetService().Create(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def LaunchDynamicTemplate(template_args=None):
"""Calls the Dataflow Templates.LaunchTemplate method on a dynamic template.
Args:
template_args: Arguments to create template. gcs_location must point to a
Json serialized DynamicTemplateFileSpec.
Returns:
(LaunchTemplateResponse)
"""
params_list = []
parameters = template_args.parameters
for k, v in six.iteritems(parameters) if parameters else {}:
params_list.append(
Templates.LAUNCH_TEMPLATE_PARAMETERS_VALUE.AdditionalProperty(
key=k, value=v))
transform_mapping_list = Templates.__ConvertDictArguments(
template_args.transform_name_mappings,
Templates.DYNAMIC_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE,
)
transform_mappings = None
streaming_update = None
if template_args.streaming_update:
streaming_update = template_args.streaming_update
if transform_mapping_list:
transform_mappings = (
Templates.DYNAMIC_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE(
additionalProperties=transform_mapping_list
)
)
# TODO(b/139889563): Remove default when args region is changed to required
region_id = template_args.region_id or DATAFLOW_API_DEFAULT_REGION
ip_configuration_enum = GetMessagesModule(
).RuntimeEnvironment.IpConfigurationValueValuesEnum
ip_private = ip_configuration_enum.WORKER_IP_PRIVATE
ip_configuration = ip_private if template_args.disable_public_ips else None
body = Templates.LAUNCH_TEMPLATE_PARAMETERS(
environment=GetMessagesModule().RuntimeEnvironment(
serviceAccountEmail=template_args.service_account_email,
zone=template_args.zone,
maxWorkers=template_args.max_workers,
numWorkers=template_args.num_workers,
network=template_args.network,
subnetwork=template_args.subnetwork,
machineType=template_args.worker_machine_type,
tempLocation=template_args.staging_location,
kmsKeyName=template_args.kms_key_name,
ipConfiguration=ip_configuration,
workerRegion=template_args.worker_region,
workerZone=template_args.worker_zone,
enableStreamingEngine=template_args.enable_streaming_engine,
additionalExperiments=(
template_args.additional_experiments
if template_args.additional_experiments
else []
),
),
jobName=template_args.job_name,
parameters=Templates.LAUNCH_TEMPLATE_PARAMETERS_VALUE(
additionalProperties=params_list) if params_list else None,
update=streaming_update,
transformNameMapping=transform_mappings,
)
request = (
GetMessagesModule().DataflowProjectsLocationsTemplatesLaunchRequest(
gcsPath=template_args.gcs_location,
location=region_id,
launchTemplateParameters=body,
projectId=template_args.project_id or GetProject(),
validateOnly=False,
)
)
try:
return Templates.GetService().Launch(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def __ConvertDictArguments(arguments, value_message):
"""Convert dictionary arguments to parameter list .
Args:
arguments: Arguments for create job using template.
value_message: the value message of the arguments
Returns:
List of value_message.AdditionalProperty
"""
params_list = []
if arguments:
for k, v in six.iteritems(arguments):
params_list.append(value_message.AdditionalProperty(key=k, value=v))
return params_list
@staticmethod
def BuildJavaImageDockerfile(flex_template_base_image, pipeline_paths, env):
"""Builds Dockerfile contents for java flex template image.
Args:
flex_template_base_image: SDK version or base image to use.
pipeline_paths: List of paths to pipelines and dependencies.
env: Dictionary of env variables to set in the container image.
Returns:
Dockerfile contents as string.
"""
dockerfile_template = """
FROM {base_image}
{env}
{copy}
{commands}
"""
commands = ''
env['FLEX_TEMPLATE_JAVA_CLASSPATH'] = '/template/*'
envs = ['ENV {}={}'.format(k, v) for k, v in sorted(env.items())]
env_list = '\n'.join(envs)
paths = ' '.join(pipeline_paths)
copy_command = 'COPY {} /template/'.format(paths)
dockerfile_contents = textwrap.dedent(dockerfile_template).format(
base_image=Templates._GetFlexTemplateBaseImage(
flex_template_base_image),
env=env_list,
copy=copy_command,
commands='\n'.join(commands))
return dockerfile_contents
@staticmethod
def BuildPythonImageDockerfile(flex_template_base_image, pipeline_paths, env):
"""Builds Dockerfile contents for python flex template image.
Args:
flex_template_base_image: SDK version or base image to use.
pipeline_paths: List of paths to pipelines and dependencies.
env: Dictionary of env variables to set in the container image.
Returns:
Dockerfile contents as string.
"""
dockerfile_template = """
FROM {base_image}
{env}
{copy}
{commands}
"""
commands = [
'apt-get update',
'apt-get install -y libffi-dev git',
'rm -rf /var/lib/apt/lists/*',
]
env['FLEX_TEMPLATE_PYTHON_PY_FILE'] = (
f'/template/{env["FLEX_TEMPLATE_PYTHON_PY_FILE"]}'
)
if 'FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES' in env:
package_list = env['FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES'].split(',')
if package_list:
packages_path = [f'/template/{package}' for package in package_list]
env['FLEX_TEMPLATE_PYTHON_EXTRA_PACKAGES'] = ','.join(packages_path)
package_arg = ' '.join(packages_path)
commands.append(
f'pip install {package_arg}'
)
if 'FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE' in env:
env['FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE'] = (
f'/template/{env["FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE"]}'
)
commands.append(
'pip install --no-cache-dir -U -r'
f' {env["FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE"]}'
)
commands.append(
'(pip check || (e=$?; echo "Building a container with incompatible'
' dependencies is prevented by default. If you are sure you want to'
' proceed, you need to create your own container image. See:'
' https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates";'
' exit $e))'
)
if 'FLEX_TEMPLATE_PYTHON_SETUP_FILE' in env:
env['FLEX_TEMPLATE_PYTHON_SETUP_FILE'] = (
f'/template/{env["FLEX_TEMPLATE_PYTHON_SETUP_FILE"]}'
)
envs = ['ENV {}={}'.format(k, v) for k, v in sorted(env.items())]
env_list = '\n'.join(envs)
paths = ' '.join(pipeline_paths)
copy_command = 'COPY {} /template/'.format(paths)
dockerfile_contents = textwrap.dedent(dockerfile_template).format(
base_image=Templates._GetFlexTemplateBaseImage(
flex_template_base_image),
env=env_list,
copy=copy_command,
commands='RUN ' + ' && '.join(commands))
return dockerfile_contents
# staticmethod enforced by prior code, this violates the style guide and
# would benefit from a refactor in the future.
# TODO(b/242564654): Add type annotations for arguments when presubmits allow
# them.
@staticmethod
def BuildGoImageDockerfile(flex_template_base_image,
pipeline_paths,
env):
"""Builds Dockerfile contents for go flex template image.
Args:
flex_template_base_image: SDK version or base image to use.
pipeline_paths: Path to pipeline binary.
env: Dictionary of env variables to set in the container image.
Returns:
Dockerfile contents as string.
"""
dockerfile_template = """
FROM {base_image}
{env}
{copy}
"""
env['FLEX_TEMPLATE_GO_BINARY'] = '/template/{}'.format(
env['FLEX_TEMPLATE_GO_BINARY'])
paths = ' '.join(pipeline_paths)
copy_command = 'COPY {} /template/'.format(paths)
envs = [
'ENV {}={}'.format(var, val) for var, val in sorted(env.items())
]
env_list = '\n'.join(envs)
dockerfile_contents = textwrap.dedent(dockerfile_template).format(
base_image=Templates._GetFlexTemplateBaseImage(
flex_template_base_image),
env=env_list,
copy=copy_command)
return dockerfile_contents
@staticmethod
def BuildDockerfile(flex_template_base_image, pipeline_paths, env,
sdk_language):
"""Builds Dockerfile contents for flex template image.
Args:
flex_template_base_image: SDK version or base image to use.
pipeline_paths: List of paths to pipelines and dependencies.
env: Dictionary of env variables to set in the container image.
sdk_language: SDK language of the flex template.
Returns:
Dockerfile contents as string.
"""
if sdk_language == 'JAVA':
return Templates.BuildJavaImageDockerfile(flex_template_base_image,
pipeline_paths, env)
elif sdk_language == 'PYTHON':
return Templates.BuildPythonImageDockerfile(flex_template_base_image,
pipeline_paths, env)
elif sdk_language == 'GO':
return Templates.BuildGoImageDockerfile(flex_template_base_image,
pipeline_paths, env)
@staticmethod
def _ValidateTemplateParameters(parameters):
"""Validates ParameterMetadata objects in template metadata.
Args:
parameters: List of ParameterMetadata objects.
Raises:
ValueError: If is any of the required field is not set.
"""
for parameter in parameters:
if not parameter.name:
raise ValueError(
'Invalid template metadata. Parameter name field is empty.'
' Parameter: {}'.format(parameter))
if not parameter.label:
raise ValueError(
'Invalid template metadata. Parameter label field is empty.'
' Parameter: {}'.format(parameter))
if not parameter.helpText:
raise ValueError(
'Invalid template metadata. Parameter helpText field is empty.'
' Parameter: {}'.format(parameter))
@staticmethod
def __ValidateFlexTemplateEnv(env, sdk_language):
"""Builds and validates Flex template environment values.
Args:
env: Dictionary of env variables to set in the container image.
sdk_language: SDK language of the flex template.
Returns:
True on valid env values.
Raises:
ValueError: If is any of parameter value is invalid.
"""
if sdk_language == 'JAVA' and 'FLEX_TEMPLATE_JAVA_MAIN_CLASS' not in env:
raise ValueError(('FLEX_TEMPLATE_JAVA_MAIN_CLASS environment variable '
'should be provided for all JAVA jobs.'))
elif sdk_language == 'PYTHON' and 'FLEX_TEMPLATE_PYTHON_PY_FILE' not in env:
raise ValueError(('FLEX_TEMPLATE_PYTHON_PY_FILE environment variable '
'should be provided for all PYTHON jobs.'))
elif sdk_language == 'GO' and 'FLEX_TEMPLATE_GO_BINARY' not in env:
raise ValueError(('FLEX_TEMPLATE_GO_BINARY environment variable '
'should be provided for all GO jobs.'))
return True
@staticmethod
def _BuildTemplateMetadata(template_metadata_json):
"""Builds and validates TemplateMetadata object.
Args:
template_metadata_json: Template metadata in json format.
Returns:
TemplateMetadata object on success.
Raises:
ValueError: If is any of the required field is not set.
"""
template_metadata = encoding.JsonToMessage(Templates.TEMPLATE_METADATA,
template_metadata_json)
template_metadata_obj = Templates.TEMPLATE_METADATA()
if not template_metadata.name:
raise ValueError('Invalid template metadata. Name field is empty.'
' Template Metadata: {}'.format(template_metadata))
template_metadata_obj.name = template_metadata.name
if template_metadata.description:
template_metadata_obj.description = template_metadata.description
if template_metadata.parameters:
Templates._ValidateTemplateParameters(template_metadata.parameters)
template_metadata_obj.parameters = template_metadata.parameters
if template_metadata.yamlDefinition:
template_metadata_obj.yamlDefinition = template_metadata.yamlDefinition
return template_metadata_obj
@staticmethod
def GetYamlTemplateImage(args):
"""Returns the image path for a YAML template."""
if args.image:
return args.image
elif args.yaml_image:
return args.yaml_image
# TODO: b/397983834 - Try to extract a region from the gcs bucket.
if args.worker_region:
try:
return Templates._ExtractYamlTemplateImage(args.worker_region)
except exceptions.HttpException:
pass # Fall through to using default region.
return Templates._ExtractYamlTemplateImage(DATAFLOW_API_DEFAULT_REGION)
@staticmethod
def _ExtractYamlTemplateImage(region_id):
"""Returns the image path for a YAML template."""
yaml_gcl_template_path = Templates.YAML_TEMPLATE_GCS_LOCATION.format(
region_id
)
storage_client = storage_api.StorageClient()
obj_ref = storage_util.ObjectReference.FromUrl(yaml_gcl_template_path)
try:
generic_template_definition = json.load(
storage_client.ReadObject(obj_ref)
)
except Exception as e:
raise exceptions.HttpException(
'Unable to read file {0} due to incorrect file path or insufficient'
' read permissions'.format(yaml_gcl_template_path)
) from e
return generic_template_definition['image']
@staticmethod
def _GetFlexTemplateBaseImage(flex_template_base_image):
"""Returns latest base image for given sdk version.
Args:
flex_template_base_image: SDK version or base image to use.
Returns:
If a custom base image value is given, returns the same value. Else,
returns the latest base image for the given sdk version.
"""
if flex_template_base_image == 'JAVA11':
return Templates.FLEX_TEMPLATE_JAVA11_BASE_IMAGE
elif flex_template_base_image == 'JAVA17':
return Templates.FLEX_TEMPLATE_JAVA17_BASE_IMAGE
elif flex_template_base_image == 'JAVA21':
return Templates.FLEX_TEMPLATE_JAVA21_BASE_IMAGE
elif flex_template_base_image == 'JAVA11_DISTROLESS':
return Templates.FLEX_TEMPLATE_JAVA11_DISTROLESS_BASE_IMAGE
elif flex_template_base_image == 'JAVA17_DISTROLESS':
return Templates.FLEX_TEMPLATE_JAVA17_DISTROLESS_BASE_IMAGE
elif flex_template_base_image == 'JAVA21_DISTROLESS':
return Templates.FLEX_TEMPLATE_JAVA21_DISTROLESS_BASE_IMAGE
elif flex_template_base_image == 'JAVA8':
log.warning(
'JAVA8 is deprecated and redirected to JAVA11. This option '
'will be removed in a future release'
)
return Templates.FLEX_TEMPLATE_JAVA11_BASE_IMAGE
elif flex_template_base_image == 'PYTHON3':
return Templates.FLEX_TEMPLATE_PYTHON3_BASE_IMAGE
elif flex_template_base_image == 'GO':
return Templates.FLEX_TEMPLATE_GO_BASE_IMAGE
elif flex_template_base_image == 'GO_DISTROLESS':
return Templates.FLEX_TEMPLATE_GO_DISTROLESS_BASE_IMAGE
return flex_template_base_image
@staticmethod
def _BuildSDKInfo(sdk_language):
"""Builds SDKInfo object.
Args:
sdk_language: SDK language of the flex template.
Returns:
SDKInfo object
"""
if sdk_language == 'JAVA':
return Templates.SDK_INFO(language=Templates.SDK_LANGUAGE.JAVA)
elif sdk_language == 'PYTHON':
return Templates.SDK_INFO(language=Templates.SDK_LANGUAGE.PYTHON)
elif sdk_language == 'YAML':
return Templates.SDK_INFO(language=Templates.SDK_LANGUAGE.YAML)
elif sdk_language == 'GO':
return Templates.SDK_INFO(language=Templates.SDK_LANGUAGE.GO)
@staticmethod
def _StoreFlexTemplateFile(template_file_gcs_location, container_spec_json):
"""Stores flex template container spec file in GCS.
Args:
template_file_gcs_location: GCS location to store the template file.
container_spec_json: Container spec in json format.
Returns:
Returns the stored flex template file gcs object on success.
Propagates the error on failures.
"""
with files.TemporaryDirectory() as temp_dir:
local_path = os.path.join(temp_dir, 'template-file.json')
# Use Unix style line-endings on all platforms (especially Windows)
files.WriteFileContents(local_path, container_spec_json, newline='\n')
storage_client = storage_api.StorageClient()
obj_ref = storage_util.ObjectReference.FromUrl(template_file_gcs_location)
return storage_client.CopyFileToGCS(local_path, obj_ref)
@staticmethod
def BuildAndStoreFlexTemplateFile(template_file_gcs_location,
image,
template_metadata_json,
sdk_language,
print_only,
template_args=None,
image_repository_username_secret_id=None,
image_repository_password_secret_id=None,
image_repository_cert_path=None):
"""Builds container spec and stores it in the flex template file in GCS.
Args:
template_file_gcs_location: GCS location to store the template file.
image: Path to the container image.
template_metadata_json: Template metadata in json format.
sdk_language: SDK language of the flex template.
print_only: Only prints the container spec and skips write to GCS.
template_args: Default runtime parameters specified by template authors.
image_repository_username_secret_id: Secret manager secret id for username
to authenticate to private registry.
image_repository_password_secret_id: Secret manager secret id for password
to authenticate to private registry.
image_repository_cert_path: The full URL to self-signed certificate of
private registry in Cloud Storage.
Returns:
Container spec json if print_only is set. A success message with template
file GCS path and container spec otherewise.
"""
template_metadata = None
if template_metadata_json:
template_metadata = Templates._BuildTemplateMetadata(
template_metadata_json)
sdk_info = Templates._BuildSDKInfo(sdk_language)
default_environment = None
if template_args:
user_labels_list = Templates.__ConvertDictArguments(
template_args.additional_user_labels,
Templates.FLEX_TEMPLATE_USER_LABELS_VALUE)
ip_private = Templates.IP_CONFIGURATION_ENUM_VALUE.WORKER_IP_PRIVATE
ip_configuration = (
ip_private if template_args.disable_public_ips else None
)
enable_streaming_engine = (
True if template_args.enable_streaming_engine else None
)
default_environment = Templates.FLEX_TEMPLATE_ENVIRONMENT(
serviceAccountEmail=template_args.service_account_email,
maxWorkers=template_args.max_workers,
numWorkers=template_args.num_workers,
network=template_args.network,
subnetwork=template_args.subnetwork,
machineType=template_args.worker_machine_type,
tempLocation=template_args.temp_location
if template_args.temp_location else template_args.staging_location,
stagingLocation=template_args.staging_location,
kmsKeyName=template_args.kms_key_name,
ipConfiguration=ip_configuration,
workerRegion=template_args.worker_region,
workerZone=template_args.worker_zone,
enableStreamingEngine=enable_streaming_engine,
additionalExperiments=(template_args.additional_experiments if
template_args.additional_experiments else []),
additionalUserLabels=Templates.FLEX_TEMPLATE_USER_LABELS_VALUE(
additionalProperties=user_labels_list)
if user_labels_list else None)
container_spec = Templates.CONTAINER_SPEC(
image=image,
metadata=template_metadata,
sdkInfo=sdk_info,
defaultEnvironment=default_environment,
imageRepositoryUsernameSecretId=image_repository_username_secret_id,
imageRepositoryPasswordSecretId=image_repository_password_secret_id,
imageRepositoryCertPath=image_repository_cert_path)
container_spec_json = encoding.MessageToJson(container_spec)
container_spec_pretty_json = json.dumps(
json.loads(container_spec_json),
sort_keys=True,
indent=4,
separators=(',', ': '))
if print_only:
return container_spec_pretty_json
try:
Templates._StoreFlexTemplateFile(template_file_gcs_location,
container_spec_pretty_json)
log.status.Print(
'Successfully saved container spec in flex template file.\n'
'Template File GCS Location: {}\n'
'Container Spec:\n\n'
'{}'.format(template_file_gcs_location, container_spec_pretty_json))
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def _AddPermissions(path, permissions):
"""Adds the given permissions to a file or directory.
Args:
path: The path to the file or directory.
permissions: The permissions to add.
Raises:
OSError: If the chmod fails.
"""
permissions = (
os.stat(path).st_mode & Templates.ALL_PERMISSIONS_MASK
) | permissions
os.chmod(path, permissions)
@staticmethod
def _ChmodRWorldReadable(top_dir_path):
"""Walks a dir to chmod itself and its contents with the configured access.
Args:
top_dir_path: The path to the top-level directory.
Raises:
OSError: If the chmod fails.
"""
for dirpath, _, filenames in os.walk(top_dir_path):
Templates._AddPermissions(dirpath, Templates.DIR_PERMISSIONS_MASK)
for filename in filenames:
Templates._AddPermissions(
os.path.join(dirpath, filename), Templates.FILE_PERMISSIONS_MASK
)
@staticmethod
def BuildAndStoreFlexTemplateImage(
image_gcr_path,
flex_template_base_image,
jar_paths,
py_paths,
go_binary_path,
env,
sdk_language,
gcs_log_dir,
cloud_build_service_account,
):
"""Builds the flex template docker container image and stores it in GCR.
Args:
image_gcr_path: GCR location to store the flex template container image.
flex_template_base_image: SDK version or base image to use.
jar_paths: List of jar paths to pipelines and dependencies.
py_paths: List of python paths to pipelines and dependencies.
go_binary_path: Path to compiled Go pipeline binary.
env: Dictionary of env variables to set in the container image.
sdk_language: SDK language of the flex template.
gcs_log_dir: Path to Google Cloud Storage directory to store build logs.
cloud_build_service_account: Service account to be used by Cloud
Build to build the image.
Returns:
True if container is built and store successfully.
Raises:
ValueError: If the parameters values are invalid.
"""
Templates.__ValidateFlexTemplateEnv(env, sdk_language)
with files.TemporaryDirectory() as temp_dir:
log.status.Print('Copying files to a temp directory {}'.format(temp_dir))
pipeline_files = []
paths = jar_paths
if py_paths:
paths = py_paths
elif go_binary_path:
paths = [go_binary_path]
for path in paths:
absl_path = os.path.abspath(path)
if os.path.isfile(absl_path):
copy_file = shutil.copy2(absl_path, temp_dir)
# Add the configured access to support non-root container execution.
try:
Templates._AddPermissions(
copy_file, Templates.FILE_PERMISSIONS_MASK
)
except OSError:
log.warning(
'Could not adjust permissions for copied file {}'.format(
copy_file
)
)
else:
copy_dir = shutil.copytree(
absl_path,
os.path.join(temp_dir, os.path.basename(absl_path)),
)
# Add the configured access to support non-root container execution.
try:
Templates._ChmodRWorldReadable(copy_dir)
except OSError:
log.warning(
'Could not adjust permissions for copied directory {}'.format(
copy_dir
)
)
pipeline_files.append(os.path.split(absl_path)[1])
log.status.Print(
'Generating dockerfile to build the flex template container image...')
dockerfile_contents = Templates.BuildDockerfile(flex_template_base_image,
pipeline_files, env,
sdk_language)
dockerfile_path = os.path.join(temp_dir, 'Dockerfile')
files.WriteFileContents(dockerfile_path, dockerfile_contents)
log.status.Print(
'Generated Dockerfile. Contents: {}'.format(dockerfile_contents))
messages = cloudbuild_util.GetMessagesModule()
build_config = submit_util.CreateBuildConfig(
tag=image_gcr_path,
no_cache=False,
messages=messages,
substitutions=None,
arg_config='cloudbuild.yaml',
is_specified_source=True,
no_source=False,
source=temp_dir,
gcs_source_staging_dir=None,
ignore_file=None,
arg_gcs_log_dir=gcs_log_dir,
arg_machine_type=None,
arg_disk_size=None,
arg_worker_pool=None,
arg_dir=None,
arg_revision=None,
arg_git_source_dir=None,
arg_git_source_revision=None,
arg_service_account=(
cloud_build_service_account
if cloud_build_service_account
else None
),
buildpack=None,
)
log.status.Print('Pushing flex template container image to GCR...')
submit_util.Build(messages, False, build_config)
return True
@staticmethod
def CreateJobFromFlexTemplate(template_args=None):
"""Calls the create job from flex template APIs.
Args:
template_args: Arguments for create template.
Returns:
(Job)
"""
params_list = Templates.__ConvertDictArguments(
template_args.parameters, Templates.FLEX_TEMPLATE_PARAMETERS_VALUE)
transform_mapping_list = Templates.__ConvertDictArguments(
template_args.transform_name_mappings,
Templates.FLEX_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE)
transform_mappings = None
streaming_update = None
if template_args.streaming_update:
streaming_update = template_args.streaming_update
if transform_mapping_list:
transform_mappings = (
Templates.FLEX_TEMPLATE_TRANSFORM_NAME_MAPPING_VALUE(
additionalProperties=transform_mapping_list
)
)
user_labels_list = Templates.__ConvertDictArguments(
template_args.additional_user_labels,
Templates.FLEX_TEMPLATE_USER_LABELS_VALUE)
# TODO(b/139889563): Remove default when args region is changed to required
region_id = template_args.region_id or DATAFLOW_API_DEFAULT_REGION
ip_private = Templates.IP_CONFIGURATION_ENUM_VALUE.WORKER_IP_PRIVATE
ip_configuration = ip_private if template_args.disable_public_ips else None
flexrs_goal = None
if template_args.flexrs_goal:
if template_args.flexrs_goal == 'SPEED_OPTIMIZED':
flexrs_goal = Templates.FLEXRS_GOAL_ENUM_VALUE.FLEXRS_SPEED_OPTIMIZED
elif template_args.flexrs_goal == 'COST_OPTIMIZED':
flexrs_goal = Templates.FLEXRS_GOAL_ENUM_VALUE.FLEXRS_COST_OPTIMIZED
body = Templates.LAUNCH_FLEX_TEMPLATE_REQUEST(
launchParameter=Templates.FLEX_TEMPLATE_PARAMETER(
jobName=template_args.job_name,
containerSpecGcsPath=template_args.gcs_location,
environment=Templates.FLEX_TEMPLATE_ENVIRONMENT(
serviceAccountEmail=template_args.service_account_email,
maxWorkers=template_args.max_workers,
numWorkers=template_args.num_workers,
network=template_args.network,
subnetwork=template_args.subnetwork,
machineType=template_args.worker_machine_type,
launcherMachineType=template_args.launcher_machine_type,
tempLocation=template_args.temp_location if template_args
.temp_location else template_args.staging_location,
stagingLocation=template_args.staging_location,
kmsKeyName=template_args.kms_key_name,
ipConfiguration=ip_configuration,
workerRegion=template_args.worker_region,
workerZone=template_args.worker_zone,
enableStreamingEngine=template_args.enable_streaming_engine,
flexrsGoal=flexrs_goal,
additionalExperiments=(
template_args.additional_experiments if template_args
.additional_experiments else []),
additionalUserLabels=Templates.FLEX_TEMPLATE_USER_LABELS_VALUE(
additionalProperties=user_labels_list
) if user_labels_list else None,
additionalPipelineOptions=(
template_args.additional_pipeline_options
if template_args.additional_pipeline_options
else []
)),
update=streaming_update,
transformNameMappings=transform_mappings,
parameters=Templates.FLEX_TEMPLATE_PARAMETERS_VALUE(
additionalProperties=params_list) if params_list else None))
request = GetMessagesModule(
).DataflowProjectsLocationsFlexTemplatesLaunchRequest(
projectId=template_args.project_id or GetProject(),
location=region_id,
launchFlexTemplateRequest=body)
try:
return Templates.GetFlexTemplateService().Launch(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
class Messages:
"""The Messages set of Dataflow API functions."""
LIST_REQUEST = GetMessagesModule(
).DataflowProjectsLocationsJobsMessagesListRequest
@staticmethod
def GetService():
return GetClientInstance().projects_locations_jobs_messages
@staticmethod
def List(job_id,
project_id=None,
region_id=None,
minimum_importance=None,
start_time=None,
end_time=None,
page_size=None,
page_token=None):
"""Calls the Dataflow Metrics.Get method.
Args:
job_id: The job to get messages about.
project_id: The project which owns the job.
region_id: The regional endpoint of the job.
minimum_importance: Filter to only get messages with importance >= level
start_time: If specified, return only messages with timestamps >=
start_time. The default is the job creation time (i.e. beginning of
messages).
end_time: Return only messages with timestamps < end_time. The default is
now (i.e. return up to the latest messages available).
page_size: If specified, determines the maximum number of messages to
return. If unspecified, the service may choose an appropriate default,
or may return an arbitrarily large number of results.
page_token: If supplied, this should be the value of next_page_token
returned by an earlier call. This will cause the next page of results to
be returned.
Returns:
(ListJobMessagesResponse)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule(
).DataflowProjectsLocationsJobsMessagesListRequest(
jobId=job_id,
location=region_id,
projectId=project_id,
startTime=start_time,
endTime=end_time,
minimumImportance=minimum_importance,
pageSize=page_size,
pageToken=page_token)
try:
return Messages.GetService().List(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
class Snapshots:
"""Cloud Dataflow snapshots api."""
@staticmethod
def GetService():
return GetClientInstance().projects_locations_snapshots
@staticmethod
def Delete(snapshot_id=None, project_id=None, region_id=None):
"""Calls the Dataflow Snapshots.Delete method.
Args:
snapshot_id: The id of the snapshot to delete.
project_id: The project that owns the snapshot.
region_id: The regional endpoint of the snapshot.
Returns:
(DeleteSnapshotResponse)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule(
).DataflowProjectsLocationsSnapshotsDeleteRequest(
snapshotId=snapshot_id, location=region_id, projectId=project_id)
try:
return Snapshots.GetService().Delete(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def Get(snapshot_id=None, project_id=None, region_id=None):
"""Calls the Dataflow Snapshots.Get method.
Args:
snapshot_id: The id of the snapshot to get.
project_id: The project that owns the snapshot.
region_id: The regional endpoint of the snapshot.
Returns:
(GetSnapshotResponse)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule().DataflowProjectsLocationsSnapshotsGetRequest(
snapshotId=snapshot_id, location=region_id, projectId=project_id)
try:
return Snapshots.GetService().Get(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)
@staticmethod
def List(job_id=None, project_id=None, region_id=None):
"""Calls the Dataflow Snapshots.List method.
Args:
job_id: If specified, only snapshots associated with the job will be
returned.
project_id: The project that owns the snapshot.
region_id: The regional endpoint of the snapshot.
Returns:
(ListSnapshotsResponse)
"""
project_id = project_id or GetProject()
# TODO(b/139889563): Remove default when args region is changed to required
region_id = region_id or DATAFLOW_API_DEFAULT_REGION
request = GetMessagesModule().DataflowProjectsLocationsSnapshotsListRequest(
jobId=job_id, location=region_id, projectId=project_id)
try:
return Snapshots.GetService().List(request)
except apitools_exceptions.HttpError as error:
raise exceptions.HttpException(error)