HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/run/compose/builder.py
# -*- coding: utf-8 -*- #
# Copyright 2025 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Build config for Run Compose."""

import os
from typing import Any, Dict, Optional

from apitools.base.py import encoding
from googlecloudsdk.api_lib.cloudbuild import cloudbuild_util
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.command_lib.builds import submit_util
from googlecloudsdk.command_lib.run import connection_context
from googlecloudsdk.command_lib.run import platforms
from googlecloudsdk.command_lib.run import serverless_operations
from googlecloudsdk.command_lib.run.compose import tracker as tracker_stages
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import parallel


class BuildConfig:
  """Represents the build configuration for a service."""

  def __init__(
      self, context: Optional[str] = None, dockerfile: Optional[str] = None
  ):
    self.context = context
    self.dockerfile = dockerfile
    self.image_id: Optional[str] = None

  @classmethod
  def from_dict(cls, data: Dict[str, Any]) -> 'BuildConfig':
    return cls(
        context=data.get('context'),
        dockerfile=data.get('dockerfile'),
    )

  def to_dict(self) -> Dict[str, Any]:
    """Serializes the BuildConfig instance to a dictionary."""
    return {
        'context': self.context,
        'dockerfile': self.dockerfile,
        'image_id': self.image_id,
    }


def handle(
    source_build: Dict[str, BuildConfig],
    repo: str,
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
    no_build: bool = False,
) -> None:
  """Performs source builds for all containers in parallel."""
  if no_build:
    _handle_no_build(source_build, project_name, region, tracker)
    return
  build_ops = []
  for container, build_config in source_build.items():
    try:
      build_op_ref, build_log_url, image_tag = _build_from_source(
          build_config, container, repo, project_name, region, tracker
      )
      build_ops.append(
          (container, build_config, build_op_ref, build_log_url, image_tag)
      )
    except submit_util.FailedBuildException as e:
      log.error(f'Build failed for container {container}: {e}')
      raise
    except exceptions.Error as e:
      log.error(f'An error occurred during build submission: {e}')
      raise

  if not build_ops:
    return

  def _run_build(args):
    container, build_config, build_op_ref, build_log_url, image_tag = args
    return _poll_and_handle_build_result(
        container, build_config, build_op_ref, build_log_url, tracker, image_tag
    )

  task_args = [
      (
          container,
          build_config,
          build_op_ref,
          build_log_url,
          image_tag,
      )
      for container, build_config, build_op_ref, build_log_url, image_tag in build_ops
  ]

  num_threads = min(len(task_args), 10)
  with parallel.GetPool(num_threads) as pool:
    results = pool.Map(_run_build, task_args)

  if not all(results):
    raise exceptions.Error('One or more container builds failed.')


def _poll_and_handle_build_result(
    container: str,
    build_config: BuildConfig,
    build_op_ref: resources.Resource,
    build_log_url: str,
    tracker: progress_tracker.StagedProgressTracker,  # pytype: disable=invalid-annotation
    image_tag: str,
) -> bool:
  """Polls a build operation and updates the tracker."""
  try:
    response_dict = _poll_until_build_completes(build_op_ref)
    if response_dict and response_dict['status'] != 'SUCCESS':
      tracker.FailStage(
          tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
              container=container
          ),
          None,
          message=(
              'Container build failed and logs are available at'
              ' [{build_log_url}].'.format(build_log_url=build_log_url)
          ),
      )
      return False
    else:
      image_with_digest = None
      if response_dict.get('results') and response_dict.get('results').get(
          'images'
      ):
        for img in response_dict.get('results').get('images'):
          if img.get('name') == image_tag:
            digest = img.get('digest')
            if digest:
              image_name_without_tag, _, _ = image_tag.rpartition(':')
              image_with_digest = image_name_without_tag + '@' + digest
              break

      if image_with_digest:
        build_config.image_id = image_with_digest
        log.debug(f"Image '{image_with_digest}' created.")
      else:
        build_config.image_id = image_tag
        log.debug(f"Image '{image_tag}' created.")
      tracker.CompleteStage(
          tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
              container=container
          )
      )
      return True
  except exceptions.Error as e:
    log.error(f'An error occurred while waiting for build of {container}: {e}')
    tracker.FailStage(
        tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
            container=container
        ),
        None,
        message=f'Error waiting for build to complete: {e}',
    )
    return False


def _get_service(service_name, region):
  """Get service if it exists, else return None."""
  project = properties.VALUES.core.project.Get(required=True)
  conn_context = connection_context.GetConnectionContext(
      None,
      platforms.PLATFORM_MANAGED,
      region_label=region,
  )
  service_ref = resources.REGISTRY.Parse(
      service_name,
      params={'namespacesId': project},
      collection='run.namespaces.services',
  )
  with serverless_operations.Connect(conn_context) as client:
    return client.GetService(service_ref)


def _handle_no_build(
    source_build: Dict[str, BuildConfig],
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
) -> None:
  """Handles --no-build flag."""
  if not source_build:
    return

  project = properties.VALUES.core.project.Get(required=True)
  service_ref = resources.REGISTRY.Parse(
      project_name,
      params={'namespacesId': project},
      collection='run.namespaces.services',
  )
  conn_context = connection_context.GetConnectionContext(
      None,
      platform=platforms.PLATFORM_MANAGED,
      region_label=region,
  )
  with serverless_operations.Connect(conn_context) as client:
    existing_service = client.GetService(service_ref)
  if not existing_service:
    raise exceptions.Error(
        '--no-build cannot be used for the first deployment of service'
        f" '{project_name}'."
    )
  container_to_image_map = {}
  if existing_service and existing_service.template.spec.containers:
    for c in existing_service.template.spec.containers:
      container_to_image_map[c.name] = c.image

  for container, build_config in source_build.items():
    stage_key = tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
        container=container
    )
    try:
      tracker.StartStage(stage_key)
      image = container_to_image_map.get(container)
      if not image:
        raise exceptions.Error(
            f"Could not find image for container '{container}' in service"
            f" '{project_name}'."
        )
      build_config.image_id = image
      tracker.UpdateStage(
          stage_key, f'Using image [{image}] from deployed service.'
      )
      tracker.CompleteStage(stage_key)
    except Exception as e:
      tracker.FailStage(stage_key, e, 'Image retrieval failed.')
      raise


def _write_cloudbuild_config(context: str, image_tag: str) -> str:
  """Writes a cloudbuild.yaml file to the service source directory.

  Args:
    context: The build context directory.
    image_tag: The full tag for the image to be built.

  Returns:
    The path to the written cloudbuild.yaml file.
  """
  config_data = {
      'steps': [{
          'id': f'Build Docker Image: {image_tag}',
          'name': 'gcr.io/cloud-builders/docker',
          'args': ['buildx', 'build', '--load', '-t', image_tag, '.'],
      }],
      'images': [image_tag],
  }

  out_dir = os.path.join(context, 'out')
  file_path = os.path.join(out_dir, 'cloudbuild.yaml')
  try:
    files.MakeDir(out_dir)
    with files.FileWriter(file_path) as f:
      yaml.dump(config_data, f)
    log.debug(f"Wrote Cloud Build config to '{file_path}'")
    return file_path
  except Exception as e:
    raise exceptions.Error(
        f"Failed to write Cloud Build config to '{file_path}': {e}"
    )


def _build_from_source(
    config: BuildConfig,
    container: str,
    repo: str,
    project_name: str,
    region: str,
    tracker: progress_tracker.StagedProgressTracker,
) -> tuple[resources.Resource, str, str]:
  """Performs source build for a given container using build config."""
  source_path = config.context
  if source_path is None:
    raise ValueError('Build context is required for source build.')

  image_tag = '{repo}/{project_name}_{container}:{tag}'.format(
      repo=repo, project_name=project_name, container=container, tag='latest'
  )

  # Write the cloudbuild.yaml file to the service source directory.
  config_path = _write_cloudbuild_config(source_path, image_tag)

  # Get the Cloud Build API message module
  messages = cloudbuild_util.GetMessagesModule()

  # Create the build configuration. This will upload the source
  # and set up the build steps to use the Dockerfile.
  log.debug(
      f"Creating build config for image '{image_tag}' from source"
      f" '{source_path}'"
  )
  build_config = submit_util.CreateBuildConfig(
      tag=None,
      no_cache=False,
      messages=messages,
      substitutions=None,
      arg_config=config_path,
      is_specified_source=True,
      no_source=False,
      source=source_path,
      gcs_source_staging_dir=None,
      ignore_file=None,
      arg_gcs_log_dir=None,
      arg_machine_type=None,
      arg_disk_size=None,
      arg_worker_pool=None,
      arg_dir=None,
      arg_revision=None,
      arg_git_source_dir=None,
      arg_git_source_revision=None,
      arg_service_account=None,
      buildpack=None,
      hide_logs=True,
      # skip_set_source defaults to False, so SetSource is called internally
  )

  log.debug('Submitting build to Google Cloud Build')
  build_op_ref, build_log_url = _build_using_cloud_build(
      container, tracker, messages, build_config, region
  )
  return build_op_ref, build_log_url, image_tag


def _build_using_cloud_build(
    container, tracker, build_messages, build_config, region
):
  """Build an image from source if a user specifies a source when deploying."""
  build, _ = submit_util.Build(
      build_messages,
      True,
      build_config,
      hide_logs=True,
      build_region=region,
  )
  build_op = (
      f'projects/{build.projectId}/locations/{region}/operations/{build.id}'
  )
  build_op_ref = resources.REGISTRY.ParseRelativeName(
      build_op, collection='cloudbuild.projects.locations.operations'
  )
  build_log_url = build.logUrl
  stage_key = tracker_stages.StagedProgressTrackerStage.BUILD.get_key(
      container=container
  )
  tracker.StartStage(stage_key)
  tracker.UpdateStage(
      stage_key,
      'Logs are available at [{build_log_url}].'.format(
          build_log_url=build_log_url
      ),
  )
  return build_op_ref, build_log_url


def _poll_until_build_completes(build_op_ref):
  client = cloudbuild_util.GetClientInstance()
  poller = waiter.CloudOperationPoller(
      client.projects_builds, client.operations
  )
  operation = waiter.PollUntilDone(poller, build_op_ref)
  return encoding.MessageToPyValue(operation.response)