HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/bigtable/tables.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Bigtable tables API helper."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64

from apitools.base.py import encoding
from googlecloudsdk.api_lib.bigtable import util
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import times
import six


def ParseSingleRule(rule):
  """Parses GC rules from a rule string.

  Args:
    rule: A string representing a GC rule, e.g. `maxage=10d`

  Returns:
    A GcRule object.

  Raises:
    BadArgumentExpection: the input is mal-formatted.
  """
  rule_parts = rule.split('=')
  if len(rule_parts) != 2 or not rule_parts[1]:
    raise exceptions.BadArgumentException(
        '--column-families',
        'Invalid union or intersection rule: {0}'.format(rule),
    )

  if rule_parts[0] == 'maxage':
    return util.GetAdminMessages().GcRule(
        maxAge=ConvertDurationToSeconds(rule_parts[1], '--column-families')
    )
  elif rule_parts[0] == 'maxversions':
    return util.GetAdminMessages().GcRule(maxNumVersions=int(rule_parts[1]))
  else:
    raise exceptions.BadArgumentException(
        '--column-families',
        'Invalid union or intersection rule: {0}'.format(rule),
    )


def ParseBinaryRule(rule_list):
  """Parses GC rules from a rule list of 2 elements.

  Args:
    rule_list: A string list containing 2 elements.

  Returns:
    A list of GcRule objects.

  Raises:
    BadArgumentExpection: the input list is mal-formatted.
  """
  if len(rule_list) != 2:
    # Only support binary rule
    raise exceptions.BadArgumentException(
        '--column-families',
        'Invalid union or intersection rule: ' + ' '.join(rule_list),
    )

  results = []
  for rule in rule_list:
    results.append(ParseSingleRule(rule))

  return results


def ParseExpr(expr):
  """Parses family name and GC rules from the string expression.

  Args:
    expr: A string express contains family name and optional GC rules in the
      format of `family_name[:gc_rule]`, such as `my_family:maxage=10d`.

  Returns:
    A family name and a GcRule object defined in the Bigtable admin API.

  Raises:
    BadArgumentExpection: the input string is mal-formatted.
  """
  expr_list = expr.split(':')
  family = expr_list[0]
  expr_list_len = len(expr_list)
  if expr_list_len > 2 or family != family.strip():
    raise exceptions.BadArgumentException(
        '--column-families',
        'Input column family ({0}) is mal-formatted.'.format(expr),
    )

  # Without GC rules
  if expr_list_len == 1:
    # No GC rule is allowed
    return family, None

  # With GC rules
  if not expr_list[1]:
    raise exceptions.BadArgumentException(
        '--column-families',
        'Input column family ({0}) is mal-formatted.'.format(expr),
    )

  gc_rule = expr_list[1]
  union_list = gc_rule.split('||')
  intersection_list = gc_rule.split('&&')

  # Only do 1-level of parsing, don't support nested expression.
  if len(union_list) == 2 and len(intersection_list) == 1:
    # Union rule
    return family, util.GetAdminMessages().GcRule(
        union=util.GetAdminMessages().Union(rules=ParseBinaryRule(union_list))
    )
  elif len(union_list) == 1 and len(intersection_list) == 2:
    # Intersection rule
    return family, util.GetAdminMessages().GcRule(
        intersection=util.GetAdminMessages().Intersection(
            rules=ParseBinaryRule(intersection_list)
        )
    )
  elif len(union_list) == 1 and len(intersection_list) == 1:
    # Either empty or a simple rule
    if gc_rule:
      return family, ParseSingleRule(gc_rule)
  else:
    raise exceptions.BadArgumentException(
        '--column-families',
        'Input column family ({0}) is mal-formatted.'.format(expr),
    )


def UpdateRequestWithInput(original_ref, args, req):
  """Parse argument and construct create table request.

  Args:
    original_ref: the gcloud resource.
    args: input arguments.
    req: the real request to be sent to backend service.

  Returns:
    req: the real request to be sent to backend service.
  """
  req.createTableRequest.tableId = args.table
  req.parent = original_ref.Parent().RelativeName()

  return req


def MakeSplits(split_list):
  """Convert a string list to a Split object.

  Args:
    split_list: A list that contains strings representing splitting points.

  Returns:
    A Split object.
  """
  results = []
  for split in split_list:
    results.append(util.GetAdminMessages().Split(key=split.encode('utf-8')))

  return results


def ConvertDurationToSeconds(duration, arg_name):
  """Convert a string of duration in any form to seconds.

  Args:
    duration: A string of any valid form of duration, such as `10d`, `1w`, `36h`
    arg_name: The name of the argument that the duration is passed in.

  Returns:
    A string of duration counted in seconds, such as `1000s`

  Raises:
    BadArgumentExpection: the input duration is mal-formatted.
  """
  try:
    return times.FormatDurationForJson(times.ParseDuration(duration))
  except times.DurationSyntaxError as duration_error:
    raise exceptions.BadArgumentException(
        arg_name,
        str(duration_error),
    )
  except times.DurationValueError as duration_error:
    raise exceptions.BadArgumentException(
        arg_name,
        str(duration_error),
    )


def ParseColumnFamilies(family_list):
  """Parses column families value object from the string list.

  Args:
    family_list: A list that contains one or more strings representing family
      name and optional GC rules in the format of `family_name[:gc_rule]`, such
      as `my_family_1,my_family_2:maxage=10d`.

  Returns:
    A column families value object.
  """
  results = []

  for expr in family_list:
    family, gc_rule = ParseExpr(expr)
    column_family = util.GetAdminMessages().ColumnFamily(gcRule=gc_rule)
    results.append(
        util.GetAdminMessages().Table.ColumnFamiliesValue.AdditionalProperty(
            key=family, value=column_family
        )
    )

  return util.GetAdminMessages().Table.ColumnFamiliesValue(
      additionalProperties=results
  )


def ParseChangeStreamRetentionPeriod(retention_period):
  """Parses change stream retention period from the string.

  Args:
    retention_period: Change stream retention period in the format of `3d` for 3
      days.

  Returns:
    A string of duration counted in seconds, such as `259200s`
  """
  return ConvertDurationToSeconds(
      retention_period, '--change-stream-retention-period'
  )


def ParseTieredStorageConfigDuration(duration):
  """Parses tiered storage config duration from the string.

  Args:
    duration: Tiered storage config duration in the format of a valid gcloud
      datetime duration string, such as `10d`, `1w`, `36h`.

  Returns:
    A string of duration counted in seconds, such as `259200s`
  """
  return ConvertDurationToSeconds(
      duration, '--tiered-storage-infrequent-access-older-than'
  )


def AddFieldToUpdateMask(field, req):
  """Adding a new field to the update mask of the updateTableRequest.

  Args:
    field: the field to be updated.
    req: the original updateTableRequest.

  Returns:
    req: the updateTableRequest with update mask refreshed.
  """
  update_mask = req.updateMask
  if update_mask:
    if update_mask.count(field) == 0:
      req.updateMask = update_mask + ',' + field
  else:
    req.updateMask = field
  return req


def RefreshUpdateMask(unused_ref, args, req):
  """Refresh the update mask of the updateTableRequest according to the input arguments.

  Args:
    unused_ref: the gcloud resource (unused).
    args: the input arguments.
    req: the original updateTableRequest.

  Returns:
    req: the updateTableRequest with update mask refreshed.
  """
  if args.clear_change_stream_retention_period:
    req = AddFieldToUpdateMask('changeStreamConfig', req)
  if args.change_stream_retention_period:
    req = AddFieldToUpdateMask('changeStreamConfig.retentionPeriod', req)
  if args.enable_automated_backup or args.disable_automated_backup:
    req = AddFieldToUpdateMask('automatedBackupPolicy', req)
  if args.automated_backup_retention_period:
    req = AddFieldToUpdateMask('automatedBackupPolicy.retentionPeriod', req)
  # TODO: b/418228423 - Remove this check once the flag is released in GA.
  #
  # We need this to verify that the flag exists in this release track.
  if hasattr(args, 'clear_tiered_storage_config'):
    if args.clear_tiered_storage_config:
      req = AddFieldToUpdateMask('tieredStorageConfig', req)
    if args.tiered_storage_infrequent_access_older_than:
      req = AddFieldToUpdateMask(
          'tieredStorageConfig.infrequentAccess.includeIfOlderThan', req
      )
  if args.row_key_schema_definition_file or args.clear_row_key_schema:
    req = AddFieldToUpdateMask('rowKeySchema', req)
  return req


def AddAdditionalArgs():
  """Adds additional flags."""
  return (
      AddChangeStreamConfigUpdateTableArgs()
      + AddAutomatedBackupPolicyUpdateTableArgs()
  )


def AddAdditionalArgsAlphaBeta():
  """Adds additional flags for alpha and beta."""
  return AddAdditionalArgs() + AddTieredStorageConfigUpdateTableArgs()


def AddChangeStreamConfigUpdateTableArgs():
  """Adds the change stream commands to update table CLI.

  This can't be defined in the yaml because that automatically generates the
  inverse for any boolean args and we don't want the nonsensical
  `no-clear-change-stream-retention-period`. We use store_const to only allow
  `clear-change-stream-retention-period` or `change-stream-retention-period`
  arguments

  Returns:
    Argument group containing change stream args
  """
  argument_group = base.ArgumentGroup(mutex=True)
  argument_group.AddArgument(
      base.Argument(
          '--clear-change-stream-retention-period',
          help=(
              'This disables the change stream and eventually removes the'
              ' change stream data.'
          ),
          action='store_const',
          const=True,
      )
  )
  argument_group.AddArgument(
      base.Argument(
          '--change-stream-retention-period',
          help=(
              'The length of time to retain change stream data for the table, '
              'in the range of [1 day, 7 days]. Acceptable units are days (d), '
              'hours (h), minutes (m), and seconds (s). If not already '
              'specified, enables a change stream for the table. Examples: `5d`'
              ' or `48h`.'
          ),
      )
  )
  return [argument_group]


def AddTieredStorageConfigUpdateTableArgs():
  """Adds the tiered storage config commands to update table CLI.

  This can't be defined in the yaml because that automatically generates the
  inverse for any boolean args and we don't want the nonsensical
  `no-clear-tiered-storage-config`. We use store_const to only allow
  `clear-tiered-storage-config`.

  Returns:
    Argument group containing tiered storage config args
  """
  argument_group = base.ArgumentGroup(mutex=True)
  argument_group.AddArgument(
      base.Argument(
          '--clear-tiered-storage-config',
          help='Disables the tiered storage config.',
          action='store_const',
          const=True,
      )
  )
  argument_group.AddArgument(
      base.Argument(
          '--tiered-storage-infrequent-access-older-than',
          help=(
              'The age at which data should be moved to infrequent access'
              ' storage.\n\nSee `$ gcloud topic datetimes` for information on'
              ' absolute duration formats.'
          ),
      )
  )
  return [argument_group]


def AddAutomatedBackupPolicyCreateTableArgs():
  """Adds automated backup policy commands to create table CLI.

  This can't be defined in the yaml because that automatically generates the
  inverse for any boolean args and we don't want the nonsensical
  `no-enable-automated-backup`. We use store_const to only allow
  `enable-automated-backup` argument.

  Returns:
    Argument group containing automated backup args.
  """
  argument_group = base.ArgumentGroup(mutex=True)
  argument_group.AddArgument(
      base.Argument(
          '--enable-automated-backup',
          help=(
              'Once set, enables the default automated backup policy'
              ' (retention_period=7d, frequency=1d) for the table.'
          ),
          action='store_const',
          const=True,
      )
  )
  argument_group.AddArgument(
      base.Argument(
          '--automated-backup-retention-period',
          help=(
              'The retention period of automated backup in the format of `30d`'
              ' for 30 days. Min retention period is `3d` and max is `90d`.'
              ' Setting this flag will enable automated backup for the table.'
          ),
      )
  )
  return [argument_group]


def AddAutomatedBackupPolicyUpdateTableArgs():
  """Adds automated backup policy commands to update table CLI."""
  argument_group = base.ArgumentGroup(mutex=True)
  argument_group.AddArgument(
      base.Argument(
          '--enable-automated-backup',
          help=(
              'Once set, enables the default automated backup policy'
              ' (retention_period=7d, frequency=1d) for the table. Note: If a'
              ' table has automated backup enabled, this flag resets it to the'
              ' default policy.'
          ),
          action='store_const',
          const=True,
      )
  )
  argument_group.AddArgument(
      base.Argument(
          '--disable-automated-backup',
          help='Once set, disables automated backup policy for the table.',
          action='store_const',
          const=True,
      )
  )
  argument_group.AddArgument(
      base.Argument(
          '--automated-backup-retention-period',
          help=(
              'The retention period of automated backup in the format of `30d`'
              ' for 30 days. Min retention period is `3d` and max is `90d`.'
              ' Setting this flag will enable automated backup for the table.'
          ),
      )
  )
  return [argument_group]


def HandleChangeStreamArgs(unused_ref, args, req):
  if args.change_stream_retention_period:
    req.table.changeStreamConfig = CreateChangeStreamConfig(
        args.change_stream_retention_period
    )
  return req


def HandleAutomatedBackupPolicyCreateTableArgs(unused_ref, args, req):
  """Handles automated backup policy args for create table CLI."""
  if args.enable_automated_backup:
    req.createTableRequest.table.automatedBackupPolicy = (
        CreateDefaultAutomatedBackupPolicy()
    )
  if args.automated_backup_retention_period:
    req.createTableRequest.table.automatedBackupPolicy = (
        # Keeping the frequency as None to be consistent with the UpdateTable
        # command.
        CreateAutomatedBackupPolicy(
            args.automated_backup_retention_period, None
        )
    )
  return req


def HandleAutomatedBackupPolicyUpdateTableArgs(unused_ref, args, req):
  """Handle automated backup policy args for update table CLI.

  If `enable_automated_backup` flag is set, add default policy to table. If
  `disable_automated_backup` flag is set, keep table.automatedBackupPolicy as
  empty, together with the update_mask, it will clear automated backup policy.
  If `automated_backup_retention_period` flag is set, add policy with given
  retention period to table.

  Args:
    unused_ref: the gcloud resource (unused).
    args: the input arguments.
    req: the original updateTableRequest.

  Returns:
    req: the updateTableRequest with automated backup policy handled.
  """
  if args.enable_automated_backup:
    req.table.automatedBackupPolicy = CreateDefaultAutomatedBackupPolicy()
  if args.automated_backup_retention_period:
    req.table.automatedBackupPolicy = CreateAutomatedBackupPolicy(
        args.automated_backup_retention_period, None
    )
  return req


def HandleTieredStorageArgs(unused_ref, args, req):
  """Handle tiered storage args for update table CLI.

  Args:
    unused_ref: the gcloud resource (unused).
    args: the input arguments.
    req: the original updateTableRequest.

  Returns:
    req: the updateTableRequest with tiered storage config handled.
  """
  # TODO: b/418228423 - Remove this check once the flag is released in GA.
  #
  # We need this to verify that the flag exists in this release track.
  if not hasattr(args, 'clear_tiered_storage_config'):
    return req

  if args.clear_tiered_storage_config:
    req.table.tieredStorageConfig = None
  if args.tiered_storage_infrequent_access_older_than:
    req.table.tieredStorageConfig = util.GetAdminMessages().TieredStorageConfig(
        infrequentAccess=util.GetAdminMessages().TieredStorageRule(
            includeIfOlderThan=ParseTieredStorageConfigDuration(
                args.tiered_storage_infrequent_access_older_than
            )
        )
    )

  return req


def CreateChangeStreamConfig(duration):
  return util.GetAdminMessages().ChangeStreamConfig(
      retentionPeriod=ConvertDurationToSeconds(
          duration, '--change-stream-retention-period'
      )
  )


def CreateAutomatedBackupPolicy(retention_period, frequency):
  """Constructs AutomatedBackupPolicy message with given values.

  Args:
    retention_period: The retention period of the automated backup policy.
    frequency: The frequency of the automated backup policy.

  Returns:
    AutomatedBackupPolicy with the specified policy config.
  """
  policy = util.GetAdminMessages().AutomatedBackupPolicy()
  if retention_period:
    policy.retentionPeriod = ConvertDurationToSeconds(
        retention_period, '--automated-backup-retention-period'
    )
  if frequency:
    policy.frequency = ConvertDurationToSeconds(
        frequency, '--automated-backup-frequency'
    )
  return policy


def CreateDefaultAutomatedBackupPolicy():
  """Constructs AutomatedBackupPolicy message with default values.

  The default values are: retention_period=7d, frequency=1d

  Returns:
    AutomatedBackupPolicy with default policy config.
  """
  return CreateAutomatedBackupPolicy('7d', '1d')


def Utf8ToBase64(s):
  """Encode a utf-8 string as a base64 string."""
  return six.ensure_text(base64.b64encode(six.ensure_binary(s)))


def HandleRowKeySchemaCreateTableArgs(unused_ref, args, req):
  """Handles row key schema create table args."""
  if args.row_key_schema_definition_file:
    req.createTableRequest.table.rowKeySchema = (
        ParseRowKeySchemaFromDefinitionFile(
            args.row_key_schema_definition_file,
            args.row_key_schema_pre_encoded_bytes,
        )
    )
  return req


def HandleRowKeySchemaUpdateTableArgs(unused_ref, args, req):
  """Handles row key schema update table args."""
  if args.row_key_schema_definition_file:
    req.table.rowKeySchema = ParseRowKeySchemaFromDefinitionFile(
        args.row_key_schema_definition_file,
        args.row_key_schema_pre_encoded_bytes,
    )

  if args.clear_row_key_schema:
    req.ignoreWarnings = True

  return req


def Base64EncodeBinaryFieldsInRowKeySchema(row_key_schema):
  """Encodes binary fields in the row key schema in Base64."""
  # We don't need to check for missing encoding here, as the admin API will
  # return an error if the encoding is missing.
  if (
      not row_key_schema
      or 'encoding' not in row_key_schema
      or 'delimitedBytes' not in row_key_schema['encoding']
      or 'delimiter' not in row_key_schema['encoding']['delimitedBytes']
      or not row_key_schema['encoding']['delimitedBytes']['delimiter']
  ):
    return row_key_schema

  row_key_schema['encoding']['delimitedBytes']['delimiter'] = Utf8ToBase64(
      row_key_schema['encoding']['delimitedBytes']['delimiter']
  )
  return row_key_schema


def ParseRowKeySchemaFromDefinitionFile(definition_file, pre_encoded):
  """Parses row key schema from the definition file.

  Args:
    definition_file: The path to the definition file. File must be in YAML or
      JSON format.
    pre_encoded: Whether all the binary fields in the row key schema (e.g.
      encoding.delimited_bytes.delimiter) are pre-encoded in Base64.

  Returns:
    A struct type object representing the row key schema.

  Raises:
    BadArgumentException if the definition file is not found, can't be
      read, or is not a valid YAML or JSON file.
    ValueError if the YAML/JSON object cannot be parsed as a valid row key
      schema.
  """
  row_key_schema_msg_type = (
      util.GetAdminMessages().GoogleBigtableAdminV2TypeStruct
  )
  try:
    row_key_schema_to_parse = yaml.load_path(definition_file)
    if not pre_encoded:
      Base64EncodeBinaryFieldsInRowKeySchema(row_key_schema_to_parse)
    parsed_row_key_schema = encoding.PyValueToMessage(
        row_key_schema_msg_type, row_key_schema_to_parse
    )
  except (yaml.FileLoadError, yaml.YAMLParseError) as e:
    raise exceptions.BadArgumentException('--row-key-schema-definition-file', e)
  except AttributeError as e:
    raise ValueError(
        'File [{0}] cannot be parsed as a valid row key schema. [{1}]'.format(
            definition_file, e
        )
    )
  return parsed_row_key_schema