HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/metastore/validators.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Resource parsing helpers."""

# TODO(b/331622809): Review and fix any violations of naming conventions.

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import ipaddress
import re
from typing import Any
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.metastore import parsers


STRING_MAX_LENGTH = 1000
METASTORE_TYPE_DICT = {
    'dpms': 'DATAPROC_METASTORE',
    'dataplex': 'DATAPLEX',
    'bigquery': 'BIGQUERY',
}
METASTORE_RESOURCE_PATH_DICT = {'dpms': 'services', 'dataplex': 'lakes'}


def ValidatePort(port):
  """Python hook to validate that the port is between 1024 and 65535, inclusive."""
  if port < 1024 or port > 65535:
    raise exceptions.BadArgumentException(
        '--port', 'Port ({0}) is not in the range [1025, 65535].'.format(port)
    )
  return port


def ValidateScalingFactor(scaling_factor):
  """Python hook to validate the scaling factor."""
  return ValidateScalingFactorFloat(scaling_factor, '--scaling-factor')


def ValidateMinScalingFactor(scaling_factor):
  """Python hook to validate the min scaling factor."""
  return ValidateScalingFactorFloat(scaling_factor, '--min-scaling-factor')


def ValidateMaxScalingFactor(scaling_factor):
  """Python hook to validate the max scaling factor."""
  return ValidateScalingFactorFloat(scaling_factor, '--max-scaling-factor')


def ValidateScalingFactorFloat(scaling_factor, flag_name):
  """Validate the scaling factor float value."""
  if scaling_factor < 0.1 or scaling_factor > 6:
    raise exceptions.BadArgumentException(
        flag_name,
        'Scaling factor ({0}) is not in the range [0.1, 6.0].'.format(
            scaling_factor
        ),
    )
  elif scaling_factor < 1 and scaling_factor * 10 % 1 != 0:
    raise exceptions.BadArgumentException(
        flag_name,
        'Scaling factor less than 1.0 ({0}) should be a'
        ' multiple of 0.1 (e.g. (0.1, 0.2, 0.3))'.format(scaling_factor),
    )
  elif scaling_factor >= 1 and scaling_factor % 1.0 != 0:
    raise exceptions.BadArgumentException(
        flag_name,
        'Scaling greater than 1.0 ({0}) should be a multiple'
        ' of 1.0 (e.g. (1.0, 2.0, 3.0))'.format(scaling_factor),
    )
  return scaling_factor


def ValidateGcsUri(arg_name):
  """Validates the gcs uri is formatted correctly."""

  def Process(gcs_uri):
    if not gcs_uri.startswith('gs://'):
      raise exceptions.BadArgumentException(
          arg_name, 'Expected URI {0} to start with `gs://`.'.format(gcs_uri)
      )
    return gcs_uri

  return Process


def ValidateKerberosPrincipal(kerberos_principal):
  pattern = re.compile(r'^(.+)/(.+)@(.+)$')
  if not pattern.match(kerberos_principal):
    raise exceptions.BadArgumentException(
        '--kerberos-principal',
        'Kerberos Principal {0} does not match ReGeX {1}.'.format(
            kerberos_principal, pattern
        ),
    )
  return kerberos_principal


def ValidateHourOfDay(hour):
  """Validates that the hour falls between 0 and 23, inclusive."""
  if hour < 0 or hour > 23:
    raise exceptions.BadArgumentException(
        '--maintenance-window-hour-of-day',
        'Hour of day ({0}) is not in [0, 23].'.format(hour),
    )
  return hour


def ValidateStringField(arg_name):
  """Validates that the string field is not longer than STRING_MAX_LENGTH, to avoid abuse issues."""
  if len(arg_name) > STRING_MAX_LENGTH:
    raise exceptions.BadArgumentException(
        arg_name,
        'The string field can not be longer than {0} characters.'.format(
            STRING_MAX_LENGTH
        ),
    )
  return arg_name


def ValidateCloudSqlInstanceConnectionName(connection_name):
  """Validates the connection name of a CloudSQL instance, must be in the form '{project_id}:{region}:{instance_id}'.

  Args:
    connection_name: The CloudSQL instance connection name string.

  Returns:
    The connection name string.
  Raises:
    BadArgumentException: when the input string does not match the pattern.
  """
  pattern = re.compile(r'^([^:]+:){2}[^:]+$')
  if not pattern.match(connection_name):
    raise exceptions.BadArgumentException(
        '--instance-connection-name',
        'The instance connection name should be in the format'
        ' project_id:region:instance_id',
    )
  return connection_name


def ValidateNetworkResourceName(arg_name):
  """Validates the resource name of a compute network, must be in the form 'projects/{project_id}/global/networks/{network_id}'."""

  def Process(resource_name):
    pattern = re.compile(r'^projects/[^/]+/global/networks/[^/]+$')
    if not pattern.match(resource_name):
      raise exceptions.BadArgumentException(
          arg_name,
          'The network resource name should be in the format'
          ' projects/<project_id>/global/networks/<network_id>',
      )
    return resource_name

  return Process


def ValidateSubnetworkResourceName(arg_name):
  """Validates the resource name of a compute subnetwork, must be in the form 'projects/{project_id}/regions/{region_id}/subnetworks/{subnetwork_id}'."""

  def Process(resource_name):
    pattern = re.compile(r'^projects/[^/]+/regions/[^/]+/subnetworks/[^/]+$')
    if not pattern.match(resource_name):
      raise exceptions.BadArgumentException(
          arg_name,
          'The subnetwork resource name should be in the format'
          ' projects/{project_id}/regions/{region_id}/subnetworks/{subnetwork_id}',
      )
    return resource_name

  return Process


def ValidateHiveDatabaseName(db_name):
  """Validates the hive database name.

  Args:
    db_name: the hive database name.

  Returns:
    the hive database name.
  Raises:
    BadArgumentException: when the database name doesn't conform to the pattern
    or is longer than 64 characters.
  """

  pattern = re.compile(r'^[0-9a-zA-Z$_-]+$')
  if not pattern.match(db_name):
    raise exceptions.BadArgumentException(
        '--hive-database-name',
        'hive database name must start with an alphanumeric character, and'
        ' contain only the following characters: letters, numbers, dashes (-),'
        ' and underscores (_).',
    )
  if len(db_name) > 64:
    raise exceptions.BadArgumentException(
        '--hive-database-name',
        'hive database name must be less than 64 characters.',
    )
  return db_name


def ValidateCloudSqlIpAddress(ip_address):
  """Validates the Cloud SQL IP address.

  Args:
    ip_address: the Cloud SQL IP address.

  Returns:
    the IP address.
  Raises:
    BadArgumentException: when the IP address is invalid.
  """

  try:
    ipaddress.IPv4Address(ip_address)
    return ip_address
  except ValueError:
    raise exceptions.BadArgumentException(
        '--ip-address',
        'Invalid IP address.',
    )


def ValidateSubnetIpRange(cidr):
  """Validates the subnet IP range.

  Args:
    cidr: the CIDR range for the subnet.

  Returns:
    the CIDR range.
  Raises:
    BadArgumentException: when the CIDR range is invalid.
  """

  def IsCidrWithinValidRanges(cidr):
    """Checks if a given CIDR block is contained within a list of valid CIDR ranges."""

    # Valid CIDR ranges
    rfc_1918_spaces = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
    rfc_6598_spaces = ['100.64.0.0/10']
    rfc_6890_spaces = ['192.0.0.0/24']
    rfc_5737_spaces = ['192.0.2.0/24', '198.51.100.0/24', '203.0.113.0/24']
    rfc_7526_spaces = ['192.88.99.0/24']
    rfc_2544_spaces = ['198.18.0.0/15']

    valid_cidr_ranges = (
        rfc_1918_spaces
        + rfc_6598_spaces
        + rfc_6890_spaces
        + rfc_5737_spaces
        + rfc_7526_spaces
        + rfc_2544_spaces
    )
    cidr_block = ipaddress.IPv4Network(cidr)
    for valid_range in valid_cidr_ranges:
      if cidr_block.subnet_of(ipaddress.IPv4Network(valid_range)):
        return True
    return False

  try:
    if not IsCidrWithinValidRanges(cidr):
      raise exceptions.BadArgumentException(
          '--subnet-ip-range',
          'The subnet IP range is invalid, see'
          ' https://cloud.google.com/vpc/docs/subnets.md#valid-ranges',
      )
  except ValueError:
    raise exceptions.BadArgumentException(
        '--subnet-ip-range',
        'Invalid CIDR address block.',
    )

  return cidr


def ValidateMigrationBucketName(bucket_name):
  """Validates the Cloud Storage bucket name used for CDC during migration, should not start with 'gs://'.

  Args:
    bucket_name: the Cloud Storage bucket name.

  Returns:
    the Cloud Storage bucket name.
  Raises:
    BadArgumentException: when the Cloud Storage bucket name doesn't conform to
    the pattern.
  """

  pattern = re.compile(r'^(?!gs://)([a-z0-9\._-]+)$')
  if not pattern.match(bucket_name):
    raise exceptions.BadArgumentException(
        '--bucket',
        'Invalid bucket name',
    )
  return bucket_name


def ValidateMigrationRootPath(root_path):
  """Validates the root path inside the Cloud Storage bucket used for CDC during migration, must start with a forward slash ('/') character.

  Args:
    root_path: the root path inside the Cloud Storage bucket.

  Returns:
    the root path.
  Raises:
    BadArgumentException: when the root path is invalid.
  """

  pattern = re.compile(r'^/([^\n\r]*)$')
  if not pattern.match(root_path):
    raise exceptions.BadArgumentException(
        '--root-path',
        'Invalid root path',
    )
  return root_path


def ValidateServiceMutexConfig(unused_ref, unused_args, req):
  """Validates that the mutual exclusive configurations of Dataproc Metastore service are not set at the same time.

  Args:
    req: A request with `service` field.

  Returns:
    A request without service mutex configuration conflicts.
  Raises:
    BadArgumentException: when mutual exclusive configurations of service are
    set at the same time.
  """
  return ValidateServiceMutexConfigForV1(unused_ref, unused_args, req)


def ValidateServiceMutexConfigForV1(unused_ref, unused_args, req):
  """Validates exclusively for V1 fields that the mutual exclusive configurations of Dataproc Metastore service are not set at the same time.

  Args:
    req: A request with `service` field.

  Returns:
    A request without service mutex configuration conflicts.
  Raises:
    BadArgumentException: when mutual exclusive configurations of service are
    set at the same time.
  """
  if (
      req.service.hiveMetastoreConfig
      and req.service.hiveMetastoreConfig.kerberosConfig
      and req.service.hiveMetastoreConfig.kerberosConfig.principal
      and _IsNetworkConfigPresentInService(req.service)
  ):
    raise exceptions.BadArgumentException(
        '--kerberos-principal',
        'Kerberos configuration cannot be used in conjunction with'
        ' --network-config-from-file or --consumer-subnetworks.',
    )
  return req


def ValidateScheduledBackupConfigs(unused_ref, args, req):
  """Validates that the cron_schedule and backup_location are set when the scheduled backup is enabled.

  Args:
    unused_ref: A resource ref to the parsed metastore service resource.
    args: The parsed args namespace from CLI.
    req: A request with `service` field.

  Returns:
    A request with service scheduled backups configurations required.
  Raises:
    BadArgumentException: when cron_schedule and backup_location are not set
    when the scheduled backup is enabled.
  """

  args_set = set(args.GetSpecifiedArgNames())
  if (
      req.service.scheduledBackup.enabled
      and '--scheduled-backup-cron' not in args_set
  ):
    raise exceptions.BadArgumentException(
        '--scheduled-backup-cron',
        '--scheduled-backup-cron must be set when the scheduled backup is'
        ' enabled.',
    )
  if (
      req.service.scheduledBackup.enabled
      and '--scheduled-backup-location' not in args_set
  ):
    raise exceptions.BadArgumentException(
        '--scheduled-backup-location',
        '--scheduled-backup-location must be set when the scheduled backup is'
        ' enabled.',
    )
  return req


def _IsNetworkConfigPresentInService(service):
  return service.networkConfig and service.networkConfig.consumers


def ValidateClearBackends(unused_ref, args, update_federation_req):
  """Validate if users run update federation command with --clear-backends arg only.

  Args:
    unused_ref: A resource ref to the parsed Federation resource.
    args: The parsed args namespace from CLI.
    update_federation_req: The request for the API call.

  Returns:
    String request
  Raises:
    BadArgumentException: When users run update federation command with
    --clear-backends arg only.
  """

  args_set = set(args.GetSpecifiedArgNames())
  if '--clear-backends' in args_set and '--update-backends' not in args_set:
    raise exceptions.BadArgumentException(
        '--clear-backends',
        '--clear-backends must be used with --update-backends',
    )
  return update_federation_req


def _IsZeroOrPositiveNumber(string):
  if string.isdigit():
    return int(string) >= 0
  return False


def _GetMetastoreTypeFromDict(dictionary):
  return '|'.join(value for key, value in dictionary.items())


def _GenerateShortOrLongBackendNames(metastore_type_and_name):
  """Validate and process the format of short and long names for backends.

  Args:
    metastore_type_and_name: Metastore type and name.

  Returns:
    String backend name.

  Raises:
    BadArgumentException: When the input backend(s) are invalid
  """

  if metastore_type_and_name[0].lower() == 'bigquery':
    long_name_regex = r'^projects\/.*[^\/]'
  else:
    long_name_regex = (
        r'^projects\/.*[^\/]\/locations\/.[^\/]*\/('
        + _GetMetastoreTypeFromDict(METASTORE_RESOURCE_PATH_DICT)
        + r')\/.[^\/]*$'
    )
  if '/' in metastore_type_and_name[1]:
    if re.search(long_name_regex, metastore_type_and_name[1]):
      return metastore_type_and_name[1]
    else:
      raise exceptions.BadArgumentException(
          '--backends', 'Invalid backends format'
      )
  else:
    if metastore_type_and_name[0].lower() == 'bigquery':
      return 'projects/' + metastore_type_and_name[1]
    else:
      return (
          '{0}/'
          + METASTORE_RESOURCE_PATH_DICT[metastore_type_and_name[0]]
          + '/'
          + metastore_type_and_name[1]
      )


def ValidateBackendsAndReturnMetastoreDict(backends):
  """Validate backends argument if it has correct format, metastore type and the keys are positive number and not duplicated.

  In addition, parsing the backends to backend metastore dict

  Args:
    backends: A string is passed by user in format
      <key>=<metastore_type>:<name>,... For example:
      1=dpms:dpms1,2=dataplex:lake1

  Returns:
    Backend metastore dict
  Raises:
    BadArgumentException: When the input backends is invalid or duplicated keys
  """
  backend_dict = {}

  if not backends:
    raise exceptions.BadArgumentException('--backends', 'Cannot be empty')
  backend = backends.split(',')
  for data in backend:
    rank_and_metastore = data.split('=')
    if len(rank_and_metastore) != 2:
      raise exceptions.BadArgumentException(
          '--backends', 'Invalid backends format'
      )
    key = rank_and_metastore[0]
    if not _IsZeroOrPositiveNumber(key):
      raise exceptions.BadArgumentException(
          '--backends',
          'Invalid backends format or key of backend is less than 0',
      )
    value = rank_and_metastore[1]
    metastore_type_and_name = value.split(':')
    if len(metastore_type_and_name) != 2:
      raise exceptions.BadArgumentException(
          '--backends', 'Invalid backends format'
      )
    if key in backend_dict:
      raise exceptions.BadArgumentException(
          '--backends', 'Duplicated keys of backends'
      )
    if metastore_type_and_name[0] not in METASTORE_TYPE_DICT.keys():
      raise exceptions.BadArgumentException(
          '--backends', 'Invalid backends type'
      )
    generated_name = _GenerateShortOrLongBackendNames(metastore_type_and_name)
    backend_metastores_dict = {
        'name': generated_name,
        'metastoreType': METASTORE_TYPE_DICT[metastore_type_and_name[0]],
    }
    backend_dict[key] = backend_metastores_dict
  return backend_dict


def ParseBackendsIntoRequest(job_ref, request):
  """Generate the long backend name of Dataproc Metastore federation requests.

  Args:
    job_ref: A resource ref to the parsed Federation resource.
    request: The request for the API call.

  Returns:
    Modified request for the API call.
  """

  for prop in request.federation.backendMetastores.additionalProperties:
    prop.value.name = prop.value.name.format(job_ref.Parent().RelativeName())
  return request


def ValidateKmsKeys(unused_ref, unused_args, req):
  """Validates that the kms keys are valid.

  Args:
    req: A request with `service` field.

  Returns:
    The unchaged request.
  Raises:
      InvalidResourceException: If the line is invalid.
      RequiredFieldOmittedException: If resource is underspecified.
      UnknownCollectionException: If no collection is provided or can be
          inferred.
      WrongResourceCollectionException: If the provided URL points into a
          collection other than the one specified.
  """

  if (req.service.encryptionConfig is None or
      req.service.encryptionConfig.kmsKeys is None):
    return req
  for kms_key in req.service.encryptionConfig.kmsKeys:
    parsers.ParseCloudKmsKey(kms_key)
  return req