HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/clients/client_data_transfer.py
#!/usr/bin/env python
"""The BigQuery CLI data transfer client library."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import logging
from typing import Any, Dict, NamedTuple, Optional

from googleapiclient import discovery

from clients import utils as bq_client_utils
from utils import bq_api_utils
from utils import bq_error
from utils import bq_id_utils
from utils import bq_processor_utils

Service = bq_api_utils.Service

# Data Transfer Service Authorization Info
AUTHORIZATION_CODE = 'authorization_code'
VERSION_INFO = 'version_info'


class TransferScheduleArgs:
  """Arguments to customize data transfer schedule."""

  def __init__(
      self,
      schedule: Optional[str] = None,
      start_time: Optional[str] = None,
      end_time: Optional[str] = None,
      disable_auto_scheduling: Optional[bool] = False,
      event_driven_schedule: Optional[str] = None,
  ):
    self.schedule = schedule
    self.start_time = start_time
    self.end_time = end_time
    self.disable_auto_scheduling = disable_auto_scheduling
    self.event_driven_schedule = event_driven_schedule

  def to_schedule_options_v2_payload(
      self, options_to_copy: Optional[Dict[str, Any]] = None
  ) -> Dict[str, Any]:
    """Returns a dictionary of schedule options v2.

    Args:
      options_to_copy: Existing options to copy from.

    Returns:
      A dictionary of schedule options v2 expected by the
      bigquery.transfers.create and bigquery.transfers.update API methods.

    Raises:
      bq_error.BigqueryError: If shedule options conflict.
    """
    self._validate_schedule_options()

    options = {}
    if self.event_driven_schedule:
      options['eventDrivenSchedule'] = self._process_event_driven_schedule(
          self.event_driven_schedule
      )
    elif self.disable_auto_scheduling:
      options['manualSchedule'] = {}
    else:
      options['timeBasedSchedule'] = {}
      if options_to_copy and 'timeBasedSchedule' in options_to_copy:
        options['timeBasedSchedule'] = dict(
            options_to_copy['timeBasedSchedule']
        )
      if self.schedule:
        options['timeBasedSchedule']['schedule'] = self.schedule
      if self.start_time:
        options['timeBasedSchedule']['startTime'] = self._time_or_infitity(
            self.start_time
        )
      if self.end_time:
        options['timeBasedSchedule']['endTime'] = self._time_or_infitity(
            self.end_time
        )
    return options

  def to_schedule_options_payload(
      self, options_to_copy: Optional[Dict[str, str]] = None
  ) -> Dict[str, Any]:
    """Returns a dictionary of schedule options.

    Args:
      options_to_copy: Existing options to be copied.

    Returns:
      A dictionary of schedule options expected by the
      bigquery.transfers.create and bigquery.transfers.update API methods.
    """

    # Copy the current options or start with an empty dictionary.
    options = dict(options_to_copy or {})

    if self.start_time is not None:
      options['startTime'] = self._time_or_infitity(self.start_time)
    if self.end_time is not None:
      options['endTime'] = self._time_or_infitity(self.end_time)

    options['disableAutoScheduling'] = self.disable_auto_scheduling

    return options

  def _time_or_infitity(self, time_str: str):
    """Returns None to indicate Inifinity, if time_str is an empty string."""
    return time_str or None

  def _validate_schedule_options(self):
    """Validates schedule options.

    Raises:
      bq_error.BigqueryError: If the given schedule options conflict.
    """
    is_time_based_schedule = any(
        [self.schedule, self.start_time, self.end_time]
    )
    is_event_driven_schedule = self.event_driven_schedule is not None
    if (
        sum([
            self.disable_auto_scheduling,
            is_time_based_schedule,
            is_event_driven_schedule,
        ])
    ) > 1:
      raise bq_error.BigqueryError(
          'The provided scheduling options conflict. Please specify one of'
          ' no_auto_scheduling, time-based schedule or event-driven schedule.'
      )

  def _process_event_driven_schedule(
      self,
      event_driven_schedule: str,
  ) -> Dict[str, str]:
    """Processes the event_driven_schedule given in JSON format.

    Args:
      event_driven_schedule: The user specified event driven schedule. This
        should be in JSON format given as a string. Ex:
        --event_driven_schedule='{"pubsub_subscription":"subscription"}'.

    Returns:
      parsed_event_driven_schedule: The parsed event driven schedule.

    Raises:
      bq_error.BigqueryError: If there is an error with the given params.
    """
    try:
      parsed_event_driven_schedule = json.loads(event_driven_schedule)
    except Exception as e:
      raise bq_error.BigqueryError(
          'Event driven schedule should be specified in JSON format.'
      ) from e
    if 'pubsub_subscription' not in parsed_event_driven_schedule:
      raise bq_error.BigqueryError(
          'Must specify pubsub_subscription in --event_driven_schedule.'
      )
    return parsed_event_driven_schedule


def get_transfer_config(transfer_client: discovery.Resource, transfer_id: str):
  return (
      transfer_client.projects()
      .locations()
      .transferConfigs()
      .get(name=transfer_id)
      .execute()
  )


def get_transfer_run(transfer_client: discovery.Resource, identifier: str):
  return (
      transfer_client.projects()
      .locations()
      .transferConfigs()
      .runs()
      .get(name=identifier)
      .execute()
  )


def list_transfer_configs(
    transfer_client: discovery.Resource,
    reference: Optional[bq_id_utils.ApiClientHelper.ProjectReference] = None,
    location: Optional[str] = None,
    page_size: Optional[int] = None,
    page_token: Optional[str] = None,
    data_source_ids: Optional[str] = None,
):
  """Return a list of transfer configurations.

  Args:
    transfer_client: the transfer client to use.
    reference: The ProjectReference to list transfer configurations for.
    location: The location id, e.g. 'us' or 'eu'.
    page_size: The maximum number of transfer configurations to return.
    page_token: Current page token (optional).
    data_source_ids: The dataSourceIds to display transfer configurations for.

  Returns:
    A list of transfer configurations.
  """
  results = None
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.ProjectReference,
      method='list_transfer_configs',
  )
  if page_size is not None:
    if page_size > bq_processor_utils.MAX_RESULTS:
      page_size = bq_processor_utils.MAX_RESULTS
  request = bq_processor_utils.PrepareTransferListRequest(
      reference, location, page_size, page_token, data_source_ids
  )
  if request:
    bq_processor_utils.ApplyParameters(request)
    result = (
        transfer_client.projects()
        .locations()
        .transferConfigs()
        .list(**request)
        .execute()
    )
    results = result.get('transferConfigs', [])
    if page_size is not None:
      while 'nextPageToken' in result and len(results) < page_size:
        request = bq_processor_utils.PrepareTransferListRequest(
            reference,
            location,
            page_size - len(results),
            result['nextPageToken'],
            data_source_ids,
        )
        if request:
          bq_processor_utils.ApplyParameters(request)
          result = (
              transfer_client.projects()
              .locations()
              .transferConfigs()
              .list(**request)
              .execute()
          )
          results.extend(result.get('nextPageToken', []))
        else:
          return
    if len(results) < 1:
      logging.info('There are no transfer configurations to be shown.')
    if result.get('nextPageToken'):
      return (results, result.get('nextPageToken'))
  return (results,)


def list_transfer_runs(
    transfer_client: discovery.Resource,
    reference: Optional[bq_id_utils.ApiClientHelper.TransferConfigReference],
    run_attempt: Optional[str],
    max_results: Optional[int] = None,
    page_token: Optional[str] = None,
    states: Optional[str] = None,
):
  """Return a list of transfer runs.

  Args:
    transfer_client: the transfer client to use.
    reference: The ProjectReference to list transfer runs for.
    run_attempt: Which runs should be pulled. The default value is 'LATEST',
      which only returns the latest run per day. To return all runs, please
      specify 'RUN_ATTEMPT_UNSPECIFIED'.
    max_results: The maximum number of transfer runs to return (optional).
    page_token: Current page token (optional).
    states: States to filter transfer runs (optional).

  Returns:
    A list of transfer runs.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.TransferConfigReference,
      method='list_transfer_runs',
  )
  reference = str(reference)
  request = bq_processor_utils.PrepareTransferRunListRequest(
      reference, run_attempt, max_results, page_token, states
  )
  response = (
      transfer_client.projects()
      .locations()
      .transferConfigs()
      .runs()
      .list(**request)
      .execute()
  )
  transfer_runs = response.get('transferRuns', [])
  if max_results is not None:
    while 'nextPageToken' in response and len(transfer_runs) < max_results:
      page_token = response.get('nextPageToken')
      max_results -= len(transfer_runs)
      request = bq_processor_utils.PrepareTransferRunListRequest(
          reference, run_attempt, max_results, page_token, states
      )
      response = (
          transfer_client.projects()
          .locations()
          .transferConfigs()
          .runs()
          .list(**request)
          .execute()
      )
      transfer_runs.extend(response.get('transferRuns', []))
    if response.get('nextPageToken'):
      return (transfer_runs, response.get('nextPageToken'))
  return (transfer_runs,)


def list_transfer_logs(
    transfer_client: discovery.Resource,
    reference: bq_id_utils.ApiClientHelper.TransferRunReference,
    message_type: Optional[str] = None,
    max_results: Optional[int] = None,
    page_token: Optional[str] = None,
):
  """Return a list of transfer run logs.

  Args:
    transfer_client: the transfer client to use.
    reference: The ProjectReference to list transfer run logs for.
    message_type: Message types to return.
    max_results: The maximum number of transfer run logs to return.
    page_token: Current page token (optional).

  Returns:
    A list of transfer run logs.
  """
  reference = str(reference)
  request = bq_processor_utils.PrepareListTransferLogRequest(
      reference,
      max_results=max_results,
      page_token=page_token,
      message_type=message_type,
  )
  response = (
      transfer_client.projects()
      .locations()
      .transferConfigs()
      .runs()
      .transferLogs()
      .list(**request)
      .execute()
  )
  transfer_logs = response.get('transferMessages', [])
  if max_results is not None:
    while 'nextPageToken' in response and len(transfer_logs) < max_results:
      page_token = response['nextPageToken']
      max_results -= len(transfer_logs)
      request = bq_processor_utils.PrepareListTransferLogRequest(
          reference,
          max_results=max_results,
          page_token=page_token,
          message_type=message_type,
      )
      response = (
          transfer_client.projects()
          .locations()
          .transferConfigs()
          .runs()
          .transferLogs()
          .list(**request)
          .execute()
      )
      transfer_logs.extend(response.get('transferMessages', []))
  if response.get('nextPageToken'):
    return (transfer_logs, response.get('nextPageToken'))
  return (transfer_logs,)


def start_manual_transfer_runs(
    transfer_client: discovery.Resource,
    reference: bq_id_utils.ApiClientHelper.TransferConfigReference,
    start_time: Optional[str],
    end_time: Optional[str],
    run_time: Optional[str],
):
  """Starts manual transfer runs.

  Args:
    transfer_client: the transfer client to use.
    reference: Transfer configuration name for the run.
    start_time: Start time of the range of transfer runs.
    end_time: End time of the range of transfer runs.
    run_time: Specific time for a transfer run.

  Returns:
    The list of started transfer runs.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.TransferConfigReference,
      method='start_manual_transfer_runs',
  )
  parent = str(reference)

  if run_time:
    body = {'requestedRunTime': run_time}
  else:
    body = {
        'requestedTimeRange': {'startTime': start_time, 'endTime': end_time}
    }

  configs_request = transfer_client.projects().locations().transferConfigs()
  response = configs_request.startManualRuns(parent=parent, body=body).execute()

  return response.get('runs')


def transfer_exists(
    transfer_client: discovery.Resource,
    reference: 'bq_id_utils.ApiClientHelper.TransferConfigReference',
) -> bool:
  """Returns true if the transfer exists."""
  # pylint: disable=missing-function-docstring
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.TransferConfigReference,
      method='transfer_exists',
  )
  try:
    transfer_client.projects().locations().transferConfigs().get(
        name=reference.transferConfigName
    ).execute()
    return True
  except bq_error.BigqueryNotFoundError:
    return False


def _fetch_data_source(
    transfer_client: discovery.Resource,
    project_reference: str,
    data_source_id: str,
):
  data_source_retrieval = (
      project_reference + '/locations/-/dataSources/' + data_source_id
  )

  return (
      transfer_client.projects()
      .locations()
      .dataSources()
      .get(name=data_source_retrieval)
      .execute()
  )


def update_transfer_config(
    transfer_client: discovery.Resource,
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    reference: bq_id_utils.ApiClientHelper.TransferConfigReference,
    target_dataset: Optional[str] = None,
    display_name: Optional[str] = None,
    refresh_window_days: Optional[str] = None,
    params: Optional[str] = None,
    auth_info: Optional[Dict[str, str]] = None,
    service_account_name: Optional[str] = None,
    destination_kms_key: Optional[str] = None,
    notification_pubsub_topic: Optional[str] = None,
    schedule_args: Optional[TransferScheduleArgs] = None,
):
  """Updates a transfer config.

  Args:
    transfer_client: the transfer client to use.
    id_fallbacks: IDs to use when they have not been explicitly specified.
    reference: the TransferConfigReference to update.
    target_dataset: Optional updated target dataset.
    display_name: Optional change to the display name.
    refresh_window_days: Optional update to the refresh window days. Some data
      sources do not support this.
    params: Optional parameters to update.
    auth_info: A dict contains authorization info which can be either an
      authorization_code or a version_info that the user input if they want to
      update credentials.
    service_account_name: The service account that the user could act as and
      used as the credential to create transfer runs from the transfer config.
    destination_kms_key: Optional KMS key for encryption.
    notification_pubsub_topic: The Pub/Sub topic where notifications will be
      sent after transfer runs associated with this transfer config finish.
    schedule_args: Optional parameters to customize data transfer schedule.

  Raises:
    BigqueryTypeError: if reference is not a TransferConfigReference.
    bq_error.BigqueryError: required field not given.
  """

  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.TransferConfigReference,
      method='update_transfer_config',
  )
  project_reference = 'projects/' + (
      bq_client_utils.GetProjectReference(id_fallbacks=id_fallbacks).projectId
  )
  current_config = get_transfer_config(
      transfer_client, reference.transferConfigName
  )
  update_mask = []
  update_items = {}
  update_items['dataSourceId'] = current_config['dataSourceId']
  if target_dataset:
    update_items['destinationDatasetId'] = target_dataset
    update_mask.append('transfer_config.destination_dataset_id')

  if display_name:
    update_mask.append('transfer_config.display_name')
    update_items['displayName'] = display_name

  if params:
    update_items = bq_processor_utils.ProcessParamsFlag(params, update_items)
    update_mask.append('transfer_config.params')

  # if refresh window provided, check that data source supports it
  if refresh_window_days:
    data_source_info = _fetch_data_source(
        transfer_client, project_reference, current_config['dataSourceId']
    )
    update_items = bq_processor_utils.ProcessRefreshWindowDaysFlag(
        refresh_window_days,
        data_source_info,
        update_items,
        current_config['dataSourceId'],
    )
    update_mask.append('transfer_config.data_refresh_window_days')

  if schedule_args:
    update_items['scheduleOptionsV2'] = (
        schedule_args.to_schedule_options_v2_payload(
            current_config.get('scheduleOptionsV2')
        )
    )
    update_mask.append('transfer_config.scheduleOptionsV2')

  if notification_pubsub_topic:
    update_items['notification_pubsub_topic'] = notification_pubsub_topic
    update_mask.append('transfer_config.notification_pubsub_topic')

  if auth_info is not None and AUTHORIZATION_CODE in auth_info:
    update_mask.append(AUTHORIZATION_CODE)

  if auth_info is not None and VERSION_INFO in auth_info:
    update_mask.append(VERSION_INFO)

  if service_account_name:
    update_mask.append('service_account_name')

  if destination_kms_key:
    update_items['encryption_configuration'] = {
        'kms_key_name': {'value': destination_kms_key}
    }
    update_mask.append('encryption_configuration.kms_key_name')

  try:
    transfer_client.projects().locations().transferConfigs().patch(
        body=update_items,
        name=reference.transferConfigName,
        updateMask=','.join(update_mask),
        authorizationCode=(
            None if auth_info is None else auth_info.get(AUTHORIZATION_CODE)
        ),
        versionInfo=None if auth_info is None else auth_info.get(VERSION_INFO),
        serviceAccountName=service_account_name,
        x__xgafv='2',
    ).execute()
  except bq_error.BigqueryInterfaceError as e:
    if target_dataset and 'Not found: Dataset' in str(e):
      dataset_reference = bq_client_utils.GetDatasetReference(
          id_fallbacks=id_fallbacks, identifier=target_dataset
      )
      raise bq_error.BigqueryNotFoundError(
          'Not found: %r' % (dataset_reference,), {'reason': 'notFound'}, []
      ) from e


def create_transfer_config(
    transfer_client: discovery.Resource,
    reference: str,
    data_source: str,
    target_dataset: Optional[str] = None,
    display_name: Optional[str] = None,
    refresh_window_days: Optional[str] = None,
    params: Optional[str] = None,
    auth_info: Optional[Dict[str, str]] = None,
    service_account_name: Optional[str] = None,
    notification_pubsub_topic: Optional[str] = None,
    schedule_args: Optional[TransferScheduleArgs] = None,
    destination_kms_key: Optional[str] = None,
    location: Optional[str] = None,
):
  """Create a transfer config corresponding to TransferConfigReference.

  Args:
    transfer_client: the transfer client to use.
    reference: the TransferConfigReference to create.
    data_source: The data source for the transfer config.
    target_dataset: The dataset where the new transfer config will exist.
    display_name: A display name for the transfer config.
    refresh_window_days: Refresh window days for the transfer config.
    params: Parameters for the created transfer config. The parameters should be
      in JSON format given as a string. Ex: --params="{'param':'value'}". The
      params should be the required values needed for each data source and will
      vary.
    auth_info: A dict contains authorization info which can be either an
      authorization_code or a version_info that the user input if they need
      credentials.
    service_account_name: The service account that the user could act as and
      used as the credential to create transfer runs from the transfer config.
    notification_pubsub_topic: The Pub/Sub topic where notifications will be
      sent after transfer runs associated with this transfer config finish.
    schedule_args: Optional parameters to customize data transfer schedule.
    destination_kms_key: Optional KMS key for encryption.
    location: The location where the new transfer config will run.

  Raises:
    BigqueryNotFoundError: if a requested item is not found.
    bq_error.BigqueryError: if a required field isn't provided.

  Returns:
    The generated transfer configuration name.
  """
  create_items = {}

  # The backend will check if the dataset exists.
  if target_dataset:
    create_items['destinationDatasetId'] = target_dataset

  if display_name:
    create_items['displayName'] = display_name
  else:
    raise bq_error.BigqueryError('A display name must be provided.')

  create_items['dataSourceId'] = data_source

  # if refresh window provided, check that data source supports it
  if refresh_window_days:
    data_source_info = _fetch_data_source(
        transfer_client, reference, data_source
    )
    create_items = bq_processor_utils.ProcessRefreshWindowDaysFlag(
        refresh_window_days, data_source_info, create_items, data_source
    )

  # checks that all required params are given
  # if a param that isn't required is provided, it is ignored.
  if params:
    create_items = bq_processor_utils.ProcessParamsFlag(params, create_items)
  else:
    raise bq_error.BigqueryError('Parameters must be provided.')

  if location:
    parent = reference + '/locations/' + location
  else:
    # The location is infererred by the data transfer service from the
    # dataset location.
    parent = reference + '/locations/-'

  if schedule_args:
    create_items['scheduleOptionsV2'] = (
        schedule_args.to_schedule_options_v2_payload()
    )

  if notification_pubsub_topic:
    create_items['notification_pubsub_topic'] = notification_pubsub_topic

  if destination_kms_key:
    create_items['encryption_configuration'] = {
        'kms_key_name': {'value': destination_kms_key}
    }

  new_transfer_config = (
      transfer_client.projects()
      .locations()
      .transferConfigs()
      .create(
          parent=parent,
          body=create_items,
          authorizationCode=(
              None if auth_info is None else auth_info.get(AUTHORIZATION_CODE)
          ),
          versionInfo=None
          if auth_info is None
          else auth_info.get(VERSION_INFO),
          serviceAccountName=service_account_name,
      )
      .execute()
  )

  return new_transfer_config['name']


def delete_transfer_config(
    transfer_client: discovery.Resource,
    reference: bq_id_utils.ApiClientHelper.TransferConfigReference,
    ignore_not_found: bool = False,
):
  """Deletes TransferConfigReference reference.

  Args:
    transfer_client: the transfer client to use.
    reference: the TransferConfigReference to delete.
    ignore_not_found: Whether to ignore "not found" errors.

  Raises:
    BigqueryTypeError: if reference is not a TransferConfigReference.
    bq_error.BigqueryNotFoundError: if reference does not exist and
      ignore_not_found is False.
  """

  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.TransferConfigReference,
      method='delete_transfer_config',
  )
  try:
    transfer_client.projects().locations().transferConfigs().delete(
        name=reference.transferConfigName
    ).execute()
  except bq_error.BigqueryNotFoundError as e:
    if not ignore_not_found:
      raise bq_error.BigqueryNotFoundError(
          'Not found: %r' % (reference,), {'reason': 'notFound'}, []
      ) from e