HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/bq/clients/client_dataset.py
#!/usr/bin/env python
"""The BigQuery CLI dataset client library."""

import datetime
from typing import Dict, List, NamedTuple, Optional
from googleapiclient import discovery
from clients import utils as bq_client_utils
from frontend import utils as frontend_utils
from utils import bq_error
from utils import bq_id_utils
from utils import bq_processor_utils

EXTERNAL_CATALOG_DATASET_OPTIONS_FIELD_NAME = 'externalCatalogDatasetOptions'


def GetDataset(apiclient: discovery.Resource, reference, dataset_view=None):
  """Get dataset with dataset_view parameter."""
  request = dict(reference)
  request['accessPolicyVersion'] = (
      bq_client_utils.MAX_SUPPORTED_IAM_POLICY_VERSION
  )
  if dataset_view is not None:
    request['datasetView'] = dataset_view
  return apiclient.datasets().get(**request).execute()


def ListDatasets(
    apiclient: discovery.Resource,
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    reference: Optional[bq_id_utils.ApiClientHelper.ProjectReference] = None,
    max_results: Optional[int] = None,
    page_token: Optional[str] = None,
    list_all: Optional[bool] = None,
    filter_expression: Optional[str] = None,
):
  """List the datasets associated with this reference."""
  return ListDatasetsWithTokenAndUnreachable(
      apiclient,
      id_fallbacks,
      reference,
      max_results,
      page_token,
      list_all,
      filter_expression,
  )['datasets']


def ListDatasetsWithTokenAndUnreachable(
    apiclient: discovery.Resource,
    id_fallbacks: NamedTuple(
        'IDS',
        [
            ('project_id', Optional[str]),
        ],
    ),
    reference: Optional[bq_id_utils.ApiClientHelper.ProjectReference] = None,
    max_results: Optional[int] = None,
    page_token: Optional[str] = None,
    list_all: Optional[bool] = None,
    filter_expression: Optional[str] = None,
):
  """List the datasets associated with this reference."""
  reference = bq_client_utils.NormalizeProjectReference(
      id_fallbacks=id_fallbacks, reference=reference
  )
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.ProjectReference,
      method='ListDatasets',
  )
  request = bq_processor_utils.PrepareListRequest(
      reference, max_results, page_token, filter_expression
  )
  if list_all is not None:
    request['all'] = list_all
  result = apiclient.datasets().list(**request).execute()
  dataset_list = result.get('datasets', [])
  unreachable_set = set(result.get('unreachable', []))
  next_token = result.get('nextPageToken', None)
  if max_results is not None:
    while 'nextPageToken' in result and len(dataset_list) < max_results:
      request['maxResults'] = max_results - len(dataset_list)
      request['pageToken'] = result['nextPageToken']
      result = apiclient.datasets().list(**request).execute()
      dataset_list.extend(result.get('datasets', []))
      unreachable_set.update(result.get('unreachable', []))
      next_token = result.get('nextPageToken', None)
  response = dict(datasets=dataset_list)
  if next_token:
    response['token'] = next_token
  if unreachable_set:
    response['unreachable'] = list(unreachable_set)
  return response


def GetDatasetIAMPolicy(apiclient, reference):
  """Gets IAM policy for the given dataset resource.

  Arguments:
    apiclient: the apiclient used to make the request.
    reference: the DatasetReference for the dataset resource.

  Returns:
    The IAM policy attached to the given dataset resource.

  Raises:
    BigqueryTypeError: if reference is not a DatasetReference.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='GetDatasetIAMPolicy',
  )
  formatted_resource = 'projects/%s/datasets/%s' % (
      reference.projectId,
      reference.datasetId,
  )
  body = {
      'options': {
          'requestedPolicyVersion': (
              bq_client_utils.MAX_SUPPORTED_IAM_POLICY_VERSION
          )
      }
  }
  return (
      apiclient.datasets()
      .getIamPolicy(
          resource=formatted_resource,
          body=body,
      )
      .execute()
  )


def SetDatasetIAMPolicy(apiclient: discovery.Resource, reference, policy):
  """Sets IAM policy for the given dataset resource.

  Arguments:
    apiclient: the apiclient used to make the request.
    reference: the DatasetReference for the dataset resource.
    policy: The policy string in JSON format.

  Returns:
    The updated IAM policy attached to the given dataset resource.

  Raises:
    BigqueryTypeError: if reference is not a DatasetReference.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='SetDatasetIAMPolicy',
  )
  formatted_resource = 'projects/%s/datasets/%s' % (
      reference.projectId,
      reference.datasetId,
  )
  request = {'policy': policy}
  return (
      apiclient.datasets()
      .setIamPolicy(body=request, resource=formatted_resource)
      .execute()
  )


def DatasetExists(
    apiclient: discovery.Resource,
    reference: 'bq_id_utils.ApiClientHelper.DatasetReference',
) -> bool:
  """Returns true if a dataset exists."""
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='DatasetExists',
  )
  try:
    apiclient.datasets().get(**dict(reference)).execute()
    return True
  except bq_error.BigqueryNotFoundError:
    return False


def GetDatasetRegion(
    apiclient: discovery.Resource,
    reference: 'bq_id_utils.ApiClientHelper.DatasetReference',
) -> Optional[str]:
  """Returns the region of a dataset as a string."""
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='GetDatasetRegion',
  )
  try:
    return apiclient.datasets().get(**dict(reference)).execute()['location']
  except bq_error.BigqueryNotFoundError:
    return None


# TODO(b/191712821): add tags modification here. For the Preview Tags are not
# modifiable using BigQuery UI/Cli, only using ResourceManager.
def CreateDataset(
    apiclient: discovery.Resource,
    reference,
    ignore_existing=False,
    description=None,
    display_name=None,
    acl=None,
    default_table_expiration_ms=None,
    default_partition_expiration_ms=None,
    data_location=None,
    labels=None,
    default_kms_key=None,
    source_dataset_reference=None,
    external_source=None,
    connection_id=None,
    external_catalog_dataset_options=None,
    max_time_travel_hours=None,
    storage_billing_model=None,
    resource_tags=None,
):
  """Create a dataset corresponding to DatasetReference.

  Args:
    apiclient: The apiclient used to make the request.
    reference: The DatasetReference to create.
    ignore_existing: (boolean, default False) If False, raise an exception if
      the dataset already exists.
    description: An optional dataset description.
    display_name: An optional friendly name for the dataset.
    acl: An optional ACL for the dataset, as a list of dicts.
    default_table_expiration_ms: Default expiration time to apply to new tables
      in this dataset.
    default_partition_expiration_ms: Default partition expiration time to apply
      to new partitioned tables in this dataset.
    data_location: Location where the data in this dataset should be stored.
      Must be either 'EU' or 'US'. If specified, the project that owns the
      dataset must be enabled for data location.
    labels: An optional dict of labels.
    default_kms_key: An optional kms dey that will apply to all newly created
      tables in the dataset, if no explicit key is supplied in the creating
      request.
    source_dataset_reference: An optional ApiClientHelper.DatasetReference that
      will be the source of this linked dataset. #
    external_source: External source that backs this dataset.
    connection_id: Connection used for accessing the external_source.
    external_catalog_dataset_options: An optional JSON string or file path
      containing the external catalog dataset options to create.
    max_time_travel_hours: Optional. Define the max time travel in hours. The
      value can be from 48 to 168 hours (2 to 7 days). The default value is 168
      hours if this is not set.
    storage_billing_model: Optional. Sets the storage billing model for the
      dataset.
    resource_tags: An optional dict of tags to attach to the dataset.

  Raises:
    BigqueryTypeError: If reference is not an ApiClientHelper.DatasetReference
      or if source_dataset_reference is provided but is not an
      bq_id_utils.ApiClientHelper.DatasetReference.
      or if both external_dataset_reference and source_dataset_reference
      are provided or if not all required arguments for external database is
      provided.
    BigqueryDuplicateError: if reference exists and ignore_existing
        is False.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='CreateDataset',
  )

  body = bq_processor_utils.ConstructObjectInfo(reference)
  if display_name is not None:
    body['friendlyName'] = display_name
  if description is not None:
    body['description'] = description
  if acl is not None:
    body['access'] = acl
  if default_table_expiration_ms is not None:
    body['defaultTableExpirationMs'] = default_table_expiration_ms
  if default_partition_expiration_ms is not None:
    body['defaultPartitionExpirationMs'] = default_partition_expiration_ms
  if default_kms_key is not None:
    body['defaultEncryptionConfiguration'] = {'kmsKeyName': default_kms_key}
  if data_location is not None:
    body['location'] = data_location
  if labels:
    body['labels'] = {}
    for label_key, label_value in labels.items():
      body['labels'][label_key] = label_value
  if source_dataset_reference is not None:
    bq_id_utils.typecheck(
        source_dataset_reference,
        bq_id_utils.ApiClientHelper.DatasetReference,
        method='CreateDataset',
    )
    body['linkedDatasetSource'] = {
        'sourceDataset': bq_processor_utils.ConstructObjectInfo(
            source_dataset_reference
        )['datasetReference']
    }
  # externalDatasetReference can only be specified in case of externals
  # datasets. This option cannot be used in case of regular dataset or linked
  # datasets.
  # So we only set this if an external_source is specified.
  if external_source:
    body['externalDatasetReference'] = {
        'externalSource': external_source,
        'connection': connection_id,
    }
  if external_catalog_dataset_options is not None:
    body[EXTERNAL_CATALOG_DATASET_OPTIONS_FIELD_NAME] = frontend_utils.GetJson(
        external_catalog_dataset_options
    )
  if max_time_travel_hours is not None:
    body['maxTimeTravelHours'] = max_time_travel_hours
  if storage_billing_model is not None:
    body['storageBillingModel'] = storage_billing_model
  if resource_tags is not None:
    body['resourceTags'] = resource_tags

  args = dict(reference.GetProjectReference())
  args['accessPolicyVersion'] = bq_client_utils.MAX_SUPPORTED_IAM_POLICY_VERSION
  try:
    apiclient.datasets().insert(body=body, **args).execute()
  except bq_error.BigqueryDuplicateError:
    if not ignore_existing:
      raise


def UpdateDataset(
    apiclient: discovery.Resource,
    reference: 'bq_id_utils.ApiClientHelper.DatasetReference',
    description: Optional[str] = None,
    display_name: Optional[str] = None,
    acl=None,
    default_table_expiration_ms=None,
    default_partition_expiration_ms=None,
    labels_to_set=None,
    label_keys_to_remove=None,
    etag=None,
    default_kms_key=None,
    max_time_travel_hours=None,
    storage_billing_model=None,
    tags_to_attach: Optional[Dict[str, str]] = None,
    tags_to_remove: Optional[List[str]] = None,
    clear_all_tags: Optional[bool] = False,
    external_catalog_dataset_options: Optional[str] = None,
    update_mode: Optional[bq_client_utils.UpdateMode] = None,
):
  """Updates a dataset.

  Args:
    apiclient: The apiclient used to make the request.
    reference: The DatasetReference to update.
    description: An optional dataset description.
    display_name: An optional friendly name for the dataset.
    acl: An optional ACL for the dataset, as a list of dicts.
    default_table_expiration_ms: Optional number of milliseconds for the default
      expiration duration for new tables created in this dataset.
    default_partition_expiration_ms: Optional number of milliseconds for the
      default partition expiration duration for new partitioned tables created
      in this dataset.
    labels_to_set: An optional dict of labels to set on this dataset.
    label_keys_to_remove: An optional list of label keys to remove from this
      dataset.
    etag: If set, checks that etag in the existing dataset matches.
    default_kms_key: An optional kms dey that will apply to all newly created
      tables in the dataset, if no explicit key is supplied in the creating
      request.
    max_time_travel_hours: Optional. Define the max time travel in hours. The
      value can be from 48 to 168 hours (2 to 7 days). The default value is 168
      hours if this is not set.
    storage_billing_model: Optional. Sets the storage billing model for the
      dataset.
    tags_to_attach: An optional dict of tags to attach to the dataset
    tags_to_remove: An optional list of tag keys to remove from the dataset
    clear_all_tags: If set, clears all the tags attached to the dataset
    external_catalog_dataset_options: An optional JSON string or file path
      containing the external catalog dataset options to update.
    update_mode: An optional flag indicating which datasets fields to update,
      either metadata fields only, ACL fields only, or both metadata and ACL
      fields.

  Raises:
    BigqueryTypeError: If reference is not a DatasetReference.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='UpdateDataset',
  )

  # Get the existing dataset and associated ETag.
  dataset = _ExecuteGetDatasetRequest(apiclient, reference, etag)

  # Merge in the changes.
  if display_name is not None:
    dataset['friendlyName'] = display_name
  if description is not None:
    dataset['description'] = description
  if acl is not None:
    dataset['access'] = acl
  if default_table_expiration_ms is not None:
    dataset['defaultTableExpirationMs'] = default_table_expiration_ms
  if default_partition_expiration_ms is not None:
    if default_partition_expiration_ms == 0:
      dataset['defaultPartitionExpirationMs'] = None
    else:
      dataset['defaultPartitionExpirationMs'] = default_partition_expiration_ms
  if default_kms_key is not None:
    dataset['defaultEncryptionConfiguration'] = {'kmsKeyName': default_kms_key}
  if 'labels' not in dataset:
    dataset['labels'] = {}
  if labels_to_set:
    for label_key, label_value in labels_to_set.items():
      dataset['labels'][label_key] = label_value
  if label_keys_to_remove:
    for label_key in label_keys_to_remove:
      dataset['labels'][label_key] = None
  if max_time_travel_hours is not None:
    dataset['maxTimeTravelHours'] = max_time_travel_hours
  if storage_billing_model is not None:
    dataset['storageBillingModel'] = storage_billing_model
  resource_tags = {}
  if clear_all_tags and 'resourceTags' in dataset:
    for tag in dataset['resourceTags']:
      resource_tags[tag] = None
  else:
    for tag in tags_to_remove or []:
      resource_tags[tag] = None
  for tag in tags_to_attach or {}:
    resource_tags[tag] = tags_to_attach[tag]
  # resourceTags is used to add a new tag binding, update value of existing
  # tag and also to remove a tag binding
  dataset['resourceTags'] = resource_tags

  if external_catalog_dataset_options is not None:
    dataset.setdefault(EXTERNAL_CATALOG_DATASET_OPTIONS_FIELD_NAME, {})
    current_options = dataset[EXTERNAL_CATALOG_DATASET_OPTIONS_FIELD_NAME]
    dataset[EXTERNAL_CATALOG_DATASET_OPTIONS_FIELD_NAME] = (
        frontend_utils.UpdateExternalCatalogDatasetOptions(
            current_options, external_catalog_dataset_options
        )
    )

  _ExecutePatchDatasetRequest(
      apiclient,
      reference,
      dataset,
      etag,
      update_mode,
  )


def _ExecuteGetDatasetRequest(
    apiclient: discovery.Resource,
    reference,
    etag: Optional[str] = None,
):
  """Executes request to get dataset.

  Args:
    apiclient: the apiclient used to make the request.
    reference: the DatasetReference to get.
    etag: if set, checks that etag in the existing dataset matches.

  Returns:
  The result of executing the request, if it succeeds.
  """
  args = dict(reference)
  args['accessPolicyVersion'] = bq_client_utils.MAX_SUPPORTED_IAM_POLICY_VERSION
  get_request = apiclient.datasets().get(**args)
  if etag:
    get_request.headers['If-Match'] = etag
  dataset = get_request.execute()
  return dataset


def _ExecutePatchDatasetRequest(
    apiclient: discovery.Resource,
    reference,
    dataset,
    etag: Optional[str] = None,
    update_mode: Optional[bq_client_utils.UpdateMode] = None,
):
  """Executes request to patch dataset.

  Args:
    apiclient: the apiclient used to make the request.
    reference: the DatasetReference to patch.
    dataset: the body of request
    etag: if set, checks that etag in the existing dataset matches.
    update_mode: a flag indicating which datasets fields to update.
  """
  parameters = dict(reference)
  parameters['accessPolicyVersion'] = (
      bq_client_utils.MAX_SUPPORTED_IAM_POLICY_VERSION
  )
  if update_mode is not None:
    parameters['updateMode'] = update_mode.value

  request = apiclient.datasets().patch(body=dataset, **parameters)

  # Perform a conditional update to protect against concurrent
  # modifications to this dataset.  By placing the ETag returned in
  # the get operation into the If-Match header, the API server will
  # make sure the dataset hasn't changed.  If there is a conflicting
  # change, this update will fail with a "Precondition failed"
  # error.
  if etag or dataset['etag']:
    request.headers['If-Match'] = etag if etag else dataset['etag']
  request.execute()


def DeleteDataset(
    apiclient: discovery.Resource,
    reference: bq_id_utils.ApiClientHelper.DatasetReference,
    ignore_not_found: bool = False,
    delete_contents: Optional[bool] = None,
) -> None:
  """Deletes DatasetReference reference.

  Args:
    apiclient: the api client to make the request with.
    reference: the DatasetReference to delete.
    ignore_not_found: Whether to ignore "not found" errors.
    delete_contents: [Boolean] Whether to delete the contents of non-empty
      datasets. If not specified and the dataset has tables in it, the delete
      will fail. If not specified, the server default applies.

  Raises:
    BigqueryTypeError: if reference is not a DatasetReference.
    bq_error.BigqueryNotFoundError: if reference does not exist and
      ignore_not_found is False.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.DatasetReference,
      method='DeleteDataset',
  )

  args = dict(reference)

  if delete_contents is not None:
    args['deleteContents'] = delete_contents
  try:
    apiclient.datasets().delete(**args).execute()
  except bq_error.BigqueryNotFoundError:
    if not ignore_not_found:
      raise


def UndeleteDataset(
    apiclient: discovery.Resource,
    dataset_reference: bq_id_utils.ApiClientHelper.DatasetReference,
    timestamp: Optional[datetime.datetime] = None,
) -> bool:
  """Undeletes a dataset.

  Args:
    apiclient: The api client to make the request with.
    dataset_reference: [Type:
      bq_id_utils.ApiClientHelper.DatasetReference]DatasetReference of the
      dataset to be undeleted
    timestamp: [Type: Optional[datetime.datetime]]Timestamp for which dataset
      version is to be undeleted

  Returns:
    bool: The job description, or None for ignored errors.

  Raises:
    BigqueryDuplicateError: when the dataset to be undeleted already exists.
  """
  try:
    args = dict(dataset_reference)
    if timestamp:
      args['body'] = {
          'deletionTime': frontend_utils.FormatRfc3339(timestamp).replace(
              '+00:00', ''
          )
      }
    return apiclient.datasets().undelete(**args).execute()

  except bq_error.BigqueryDuplicateError as e:
    raise e