File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/run/compose/builder.py
# -*- coding: utf-8 -*- #
# Copyright 2025 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Build config for Run Compose."""
import os
from typing import Any, Dict, Optional
from apitools.base.py import encoding
from googlecloudsdk.api_lib.cloudbuild import cloudbuild_util
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.command_lib.builds import submit_util
from googlecloudsdk.command_lib.run import connection_context
from googlecloudsdk.command_lib.run import platforms
from googlecloudsdk.command_lib.run import serverless_operations
from googlecloudsdk.command_lib.run.compose import tracker as tracker_stages
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import parallel
class BuildConfig:
  """Represents the build configuration for a service."""
  def __init__(
      self, context: Optional[str] = None, dockerfile: Optional[str] = None
  ):
    self.context = context
    self.dockerfile = dockerfile
    self.image_id: Optional[str] = None
  @classmethod
  def from_dict(cls, data: Dict[str, Any]) -> 'BuildConfig':
    return cls(
        context=data.get('context'),
        dockerfile=data.get('dockerfile'),
    )
  def to_dict(self) -> Dict[str, Any]:
    """Serializes the BuildConfig instance to a dictionary."""
    return {
        'context': self.context,
        'dockerfile': self.dockerfile,
        'image_id': self.image_id,
    }
def handle(
    source_build: Dict[str, BuildConfig],
    repo: str,
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
    no_build: bool = False,
) -> None:
  """Performs source builds for all containers in parallel."""
  if no_build:
    _handle_no_build(source_build, project_name, region, tracker)
    return
  build_ops = []
  for container, build_config in source_build.items():
    try:
      build_op_ref, build_log_url, image_tag = _build_from_source(
          build_config, container, repo, project_name, region, tracker
      )
      build_ops.append(
          (container, build_config, build_op_ref, build_log_url, image_tag)
      )
    except submit_util.FailedBuildException as e:
      log.error(f'Build failed for container {container}: {e}')
      raise
    except exceptions.Error as e:
      log.error(f'An error occurred during build submission: {e}')
      raise
  if not build_ops:
    return
  def _run_build(args):
    container, build_config, build_op_ref, build_log_url, image_tag = args
    return _poll_and_handle_build_result(
        container, build_config, build_op_ref, build_log_url, tracker, image_tag
    )
  task_args = [
      (
          container,
          build_config,
          build_op_ref,
          build_log_url,
          image_tag,
      )
      for container, build_config, build_op_ref, build_log_url, image_tag in build_ops
  ]
  num_threads = min(len(task_args), 10)
  with parallel.GetPool(num_threads) as pool:
    results = pool.Map(_run_build, task_args)
  if not all(results):
    raise exceptions.Error('One or more container builds failed.')
def _poll_and_handle_build_result(
    container: str,
    build_config: BuildConfig,
    build_op_ref: resources.Resource,
    build_log_url: str,
    tracker: progress_tracker.StagedProgressTracker,  # pytype: disable=invalid-annotation
    image_tag: str,
) -> bool:
  """Polls a build operation and updates the tracker."""
  try:
    response_dict = _poll_until_build_completes(build_op_ref)
    if response_dict and response_dict['status'] != 'SUCCESS':
      tracker.FailStage(
          tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
              container=container
          ),
          None,
          message=(
              'Container build failed and logs are available at'
              ' [{build_log_url}].'.format(build_log_url=build_log_url)
          ),
      )
      return False
    else:
      image_with_digest = None
      if response_dict.get('results') and response_dict.get('results').get(
          'images'
      ):
        for img in response_dict.get('results').get('images'):
          if img.get('name') == image_tag:
            digest = img.get('digest')
            if digest:
              image_name_without_tag, _, _ = image_tag.rpartition(':')
              image_with_digest = image_name_without_tag + '@' + digest
              break
      if image_with_digest:
        build_config.image_id = image_with_digest
        log.debug(f"Image '{image_with_digest}' created.")
      else:
        build_config.image_id = image_tag
        log.debug(f"Image '{image_tag}' created.")
      tracker.CompleteStage(
          tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
              container=container
          )
      )
      return True
  except exceptions.Error as e:
    log.error(f'An error occurred while waiting for build of {container}: {e}')
    tracker.FailStage(
        tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
            container=container
        ),
        None,
        message=f'Error waiting for build to complete: {e}',
    )
    return False
def _get_service(service_name, region):
  """Get service if it exists, else return None."""
  project = properties.VALUES.core.project.Get(required=True)
  conn_context = connection_context.GetConnectionContext(
      None,
      platforms.PLATFORM_MANAGED,
      region_label=region,
  )
  service_ref = resources.REGISTRY.Parse(
      service_name,
      params={'namespacesId': project},
      collection='run.namespaces.services',
  )
  with serverless_operations.Connect(conn_context) as client:
    return client.GetService(service_ref)
def _handle_no_build(
    source_build: Dict[str, BuildConfig],
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
) -> None:
  """Handles --no-build flag."""
  if not source_build:
    return
  project = properties.VALUES.core.project.Get(required=True)
  service_ref = resources.REGISTRY.Parse(
      project_name,
      params={'namespacesId': project},
      collection='run.namespaces.services',
  )
  conn_context = connection_context.GetConnectionContext(
      None,
      platform=platforms.PLATFORM_MANAGED,
      region_label=region,
  )
  with serverless_operations.Connect(conn_context) as client:
    existing_service = client.GetService(service_ref)
  if not existing_service:
    raise exceptions.Error(
        '--no-build cannot be used for the first deployment of service'
        f" '{project_name}'."
    )
  container_to_image_map = {}
  if existing_service and existing_service.template.spec.containers:
    for c in existing_service.template.spec.containers:
      container_to_image_map[c.name] = c.image
  for container, build_config in source_build.items():
    stage_key = tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
        container=container
    )
    try:
      tracker.StartStage(stage_key)
      image = container_to_image_map.get(container)
      if not image:
        raise exceptions.Error(
            f"Could not find image for container '{container}' in service"
            f" '{project_name}'."
        )
      build_config.image_id = image
      tracker.UpdateStage(
          stage_key, f'Using image [{image}] from deployed service.'
      )
      tracker.CompleteStage(stage_key)
    except Exception as e:
      tracker.FailStage(stage_key, e, 'Image retrieval failed.')
      raise
def _write_cloudbuild_config(context: str, image_tag: str) -> str:
  """Writes a cloudbuild.yaml file to the service source directory.
  Args:
    context: The build context directory.
    image_tag: The full tag for the image to be built.
  Returns:
    The path to the written cloudbuild.yaml file.
  """
  config_data = {
      'steps': [{
          'id': f'Build Docker Image: {image_tag}',
          'name': 'gcr.io/cloud-builders/docker',
          'args': ['buildx', 'build', '--load', '-t', image_tag, '.'],
      }],
      'images': [image_tag],
  }
  out_dir = os.path.join(context, 'out')
  file_path = os.path.join(out_dir, 'cloudbuild.yaml')
  try:
    files.MakeDir(out_dir)
    with files.FileWriter(file_path) as f:
      yaml.dump(config_data, f)
    log.debug(f"Wrote Cloud Build config to '{file_path}'")
    return file_path
  except Exception as e:
    raise exceptions.Error(
        f"Failed to write Cloud Build config to '{file_path}': {e}"
    )
def _build_from_source(
    config: BuildConfig,
    container: str,
    repo: str,
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
) -> tuple[resources.Resource, str, str]:
  """Performs source build for a given container using build config."""
  source_path = config.context
  if source_path is None:
    raise ValueError('Build context is required for source build.')
  image_tag = '{repo}/{project_name}_{container}:{tag}'.format(
      repo=repo, project_name=project_name, container=container, tag='latest'
  )
  # Write the cloudbuild.yaml file to the service source directory.
  config_path = _write_cloudbuild_config(source_path, image_tag)
  # Get the Cloud Build API message module
  messages = cloudbuild_util.GetMessagesModule()
  # Create the build configuration. This will upload the source
  # and set up the build steps to use the Dockerfile.
  log.debug(
      f"Creating build config for image '{image_tag}' from source"
      f" '{source_path}'"
  )
  build_config = submit_util.CreateBuildConfig(
      tag=None,
      no_cache=False,
      messages=messages,
      substitutions=None,
      arg_config=config_path,
      is_specified_source=True,
      no_source=False,
      source=source_path,
      gcs_source_staging_dir=None,
      ignore_file=None,
      arg_gcs_log_dir=None,
      arg_machine_type=None,
      arg_disk_size=None,
      arg_worker_pool=None,
      arg_dir=None,
      arg_revision=None,
      arg_git_source_dir=None,
      arg_git_source_revision=None,
      arg_service_account=None,
      buildpack=None,
      hide_logs=True,
      # skip_set_source defaults to False, so SetSource is called internally
  )
  log.debug('Submitting build to Google Cloud Build')
  build_op_ref, build_log_url = _build_using_cloud_build(
      container, tracker, messages, build_config, region
  )
  return build_op_ref, build_log_url, image_tag
def _build_using_cloud_build(
    container, tracker, build_messages, build_config, region
):
  """Build an image from source if a user specifies a source when deploying."""
  build, _ = submit_util.Build(
      build_messages,
      True,
      build_config,
      hide_logs=True,
      build_region=region,
  )
  build_op = (
      f'projects/{build.projectId}/locations/{region}/operations/{build.id}'
  )
  build_op_ref = resources.REGISTRY.ParseRelativeName(
      build_op, collection='cloudbuild.projects.locations.operations'
  )
  build_log_url = build.logUrl
  stage_key = tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
      container=container
  )
  tracker.StartStage(stage_key)
  tracker.UpdateStage(
      stage_key,
      'Logs are available at [{build_log_url}].'.format(
          build_log_url=build_log_url
      ),
  )
  return build_op_ref, build_log_url
def _poll_until_build_completes(build_op_ref):
  client = cloudbuild_util.GetClientInstance()
  poller = waiter.CloudOperationPoller(
      client.projects_builds, client.operations
  )
  operation = waiter.PollUntilDone(poller, build_op_ref)
  return encoding.MessageToPyValue(operation.response)