HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/sql/instance_prop_reducers.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reducer functions to generate instance props from prior state and flags."""


from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import argparse
import datetime

from googlecloudsdk.api_lib.sql import api_util as common_api_util
from googlecloudsdk.api_lib.sql import constants
from googlecloudsdk.api_lib.sql import exceptions as sql_exceptions
from googlecloudsdk.api_lib.sql import instances as api_util
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
import six


def ActiveDirectoryConfig(
    sql_messages,
    domain=None,
    mode=None,
    dns_servers=None,
    admin_credential_secret_key=None,
    organizational_unit=None,
):
  """Generates the Active Directory configuration for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    domain: string, the Active Directory domain value.
    mode: string, the Active Directory mode value.
    dns_servers: list of strings, the list of dns servers.
    admin_credential_secret_key: string, the name of the admin credential secret
      manager key.
    organizational_unit: string, the organizational unit value.

  Returns:
    sql_messages.SqlActiveDirectoryConfig object.
  """

  should_generate_config = any([
      domain is not None,
      mode is not None,
      dns_servers is not None,
      admin_credential_secret_key is not None,
      organizational_unit is not None,
  ])
  if not should_generate_config:
    return None

  config = sql_messages.SqlActiveDirectoryConfig()
  if domain is not None:
    config.domain = domain
  if admin_credential_secret_key is not None:
    config.adminCredentialSecretName = admin_credential_secret_key
  if mode is not None:
    config.mode = sql_messages.SqlActiveDirectoryConfig.ModeValueValuesEnum.lookup_by_name(
        mode.upper()
    )
  if dns_servers:
    config.dnsServers = dns_servers
  if organizational_unit is not None:
    config.organizationalUnit = organizational_unit
  return config


def SqlServerAuditConfig(sql_messages,
                         bucket=None,
                         retention_interval=None,
                         upload_interval=None):
  """Generates the Audit configuration for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    bucket: string, the GCS bucket name.
    retention_interval: duration, how long to keep generated audit files.
    upload_interval: duration, how often to upload generated audit files.

  Returns:
    sql_messages.SqlServerAuditConfig object.
  """

  if bucket is None and retention_interval is None and upload_interval is None:
    return None

  config = sql_messages.SqlServerAuditConfig()
  if bucket is not None:
    config.bucket = bucket
  if retention_interval is not None:
    config.retentionInterval = six.text_type(retention_interval) + 's'
  if upload_interval is not None:
    config.uploadInterval = six.text_type(upload_interval) + 's'

  return config


def BackupConfiguration(
    sql_messages,
    instance=None,
    backup_enabled=None,
    backup_location=None,
    backup_start_time=None,
    enable_bin_log=None,
    enable_point_in_time_recovery=None,
    retained_backups_count=None,
    retained_transaction_log_days=None,
    patch_request=False,
):
  """Generates the backup configuration for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    instance: sql_messages.DatabaseInstance, the original instance, if the
      previous state is needed.
    backup_enabled: boolean, True if backup should be enabled.
    backup_location: string, location where to store backups by default.
    backup_start_time: string, start time of backup specified in 24-hour format.
    enable_bin_log: boolean, True if binary logging should be enabled.
    enable_point_in_time_recovery: boolean, True if point-in-time recovery
      (using write-ahead log archiving) should be enabled.
    retained_backups_count: int, how many backups to keep stored.
    retained_transaction_log_days: int, how many days of transaction logs to
      keep stored.
    patch_request: boolean, True if this is a patch request.

  Returns:
    sql_messages.BackupConfiguration object, or None

  Raises:
    ToolException: Bad combination of arguments.
  """
  should_generate_config = any([
      backup_location is not None,
      backup_start_time,
      enable_bin_log is not None,
      enable_point_in_time_recovery is not None,
      retained_backups_count is not None,
      retained_transaction_log_days is not None,
      not backup_enabled,
  ])

  if not should_generate_config:
    return None

  if not instance or not instance.settings.backupConfiguration:
    backup_config = sql_messages.BackupConfiguration(
        kind='sql#backupConfiguration',
        startTime='00:00',
        enabled=backup_enabled)
  else:
    backup_config = instance.settings.backupConfiguration

  gcbdr_managed = (
      backup_config.backupTier
      == sql_messages.BackupConfiguration.BackupTierValueValuesEnum.ENHANCED
  )

  if patch_request and gcbdr_managed:
    raise sql_exceptions.ArgumentError(
        'Backup configuration cannot be changed for instances with a BackupDR'
        ' backup plan attached.'
    )

  if backup_location is not None:
    backup_config.location = backup_location
    backup_config.enabled = True
  if backup_start_time:
    backup_config.startTime = backup_start_time
    backup_config.enabled = True

  if retained_backups_count is not None:
    backup_retention_settings = (
        backup_config.backupRetentionSettings or
        sql_messages.BackupRetentionSettings())
    backup_retention_settings.retentionUnit = sql_messages.BackupRetentionSettings.RetentionUnitValueValuesEnum.COUNT
    backup_retention_settings.retainedBackups = retained_backups_count

    backup_config.backupRetentionSettings = backup_retention_settings
    backup_config.enabled = True

  if retained_transaction_log_days is not None:
    backup_config.transactionLogRetentionDays = retained_transaction_log_days
    backup_config.enabled = True

  if not backup_enabled:
    if (backup_location is not None or backup_start_time or
        retained_backups_count is not None or
        retained_transaction_log_days is not None):
      raise sql_exceptions.ArgumentError(
          'Argument --no-backup not allowed with --backup-location, '
          '--backup-start-time, --retained-backups-count, or '
          '--retained-transaction-log-days')
    backup_config.enabled = False

  if enable_bin_log is not None:
    backup_config.binaryLogEnabled = enable_bin_log

  if enable_point_in_time_recovery is not None:
    backup_config.pointInTimeRecoveryEnabled = enable_point_in_time_recovery
    if backup_config.replicationLogArchivingEnabled is not None:
      backup_config.replicationLogArchivingEnabled = (
          enable_point_in_time_recovery
      )

  # retainedTransactionLogDays is only valid when we have transaction logs,
  # i.e, have binlog or pitr.
  if (
      retained_transaction_log_days
      and not backup_config.binaryLogEnabled
      and not backup_config.pointInTimeRecoveryEnabled
  ):
    raise sql_exceptions.ArgumentError(
        'Argument --retained-transaction-log-days only valid when '
        'transaction logs are enabled. To enable transaction logs, use '
        '--enable-bin-log for MySQL, and use --enable-point-in-time-recovery '
        'for Postgres and SQL Server.'
    )

  return backup_config


def DatabaseFlags(sql_messages,
                  settings=None,
                  database_flags=None,
                  clear_database_flags=False):
  """Generates the database flags for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    settings: sql_messages.Settings, the original settings, if the previous
      state is needed.
    database_flags: dict of flags.
    clear_database_flags: boolean, True if flags should be cleared.

  Returns:
    list of sql_messages.DatabaseFlags objects
  """
  updated_flags = []
  if database_flags:
    for (name, value) in sorted(database_flags.items()):
      updated_flags.append(sql_messages.DatabaseFlags(name=name, value=value))
  elif clear_database_flags:
    updated_flags = []
  elif settings:
    updated_flags = settings.databaseFlags

  return updated_flags


def Tags(sql_messages, tags=None):
  """Generates the tags for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    tags: list of tags.

  Returns:
    list of sql_messages.Tags objects
  """
  updated_tags = []
  if tags:
    for tag in tags:
      updated_tags.append(sql_messages.Tags(tag=tag))

  return updated_tags


def MaintenanceWindow(sql_messages,
                      instance,
                      maintenance_release_channel=None,
                      maintenance_window_day=None,
                      maintenance_window_hour=None):
  """Generates the maintenance window for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    instance: sql_messages.DatabaseInstance, The original instance, if it might
      be needed to generate the maintenance window.
    maintenance_release_channel: string, which channel's updates to apply.
    maintenance_window_day: string, maintenance window day of week.
    maintenance_window_hour: int, maintenance window hour of day.

  Returns:
    sql_messages.MaintenanceWindow or None

  Raises:
    argparse.ArgumentError: no maintenance window specified.
  """
  channel = maintenance_release_channel
  day = maintenance_window_day
  hour = maintenance_window_hour
  if not any([channel, day, hour]):
    return None

  maintenance_window = sql_messages.MaintenanceWindow(
      kind='sql#maintenanceWindow')

  # If there's no existing maintenance window,
  # both or neither of day and hour must be set.
  if (not instance or not instance.settings or
      not instance.settings.maintenanceWindow):
    if ((day is None and hour is not None) or
        (hour is None and day is not None)):
      raise argparse.ArgumentError(
          None, 'There is currently no maintenance window on the instance. '
          'To add one, specify values for both day, and hour.')

  if channel:
    # Map UI name to API name.
    names = {
        'week5':
            sql_messages.MaintenanceWindow.UpdateTrackValueValuesEnum.week5,
        'production':
            sql_messages.MaintenanceWindow.UpdateTrackValueValuesEnum.stable,
        'preview':
            sql_messages.MaintenanceWindow.UpdateTrackValueValuesEnum.canary
    }
    maintenance_window.updateTrack = names[channel]
  if day:
    # Map day name to number.
    day_num = arg_parsers.DayOfWeek.DAYS.index(day)
    if day_num == 0:
      day_num = 7
    maintenance_window.day = day_num
  if hour is not None:  # must execute on hour = 0
    maintenance_window.hour = hour
  return maintenance_window


def DenyMaintenancePeriod(sql_messages,
                          instance,
                          deny_maintenance_period_start_date=None,
                          deny_maintenance_period_end_date=None,
                          deny_maintenance_period_time='00:00:00'):
  """Generates the deny maintenance period for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    instance: sql_messages.DatabaseInstance, The original instance, if it might
      be needed to generate the deny maintenance period.
    deny_maintenance_period_start_date: date, Date when the deny maintenance
      period begins, i.e., 2020-11-01.
    deny_maintenance_period_end_date: date, Date when the deny maintenance
      period ends, i.e., 2021-01-10.
    deny_maintenance_period_time: Time when the deny maintenance period
      starts/ends, i.e., 05:00:00.

  Returns:
    sql_messages.DenyMaintenancePeriod or None

  Raises:
    argparse.ArgumentError: invalid deny maintenance period specified.
  """
  old_deny_maintenance_period = None
  if (instance and instance.settings and
      instance.settings.denyMaintenancePeriods and
      instance.settings.denyMaintenancePeriods[0]):
    old_deny_maintenance_period = instance.settings.denyMaintenancePeriods[0]

  deny_maintenance_period = sql_messages.DenyMaintenancePeriod()

  if old_deny_maintenance_period:
    # if there is deny maintenance period specified for the instance
    deny_maintenance_period = old_deny_maintenance_period
    if deny_maintenance_period_start_date:
      ValidateDate(deny_maintenance_period_start_date)
      deny_maintenance_period.startDate = deny_maintenance_period_start_date

    if deny_maintenance_period_end_date:
      ValidateDate(deny_maintenance_period_end_date)
      deny_maintenance_period.endDate = deny_maintenance_period_end_date

    if deny_maintenance_period_time:
      ValidTime(deny_maintenance_period_time)
      deny_maintenance_period.time = deny_maintenance_period_time
  else:
    if not (deny_maintenance_period_start_date and
            deny_maintenance_period_end_date):
      raise argparse.ArgumentError(
          None, 'There is no deny maintenance period on the instance.'
          ' To add one, specify values for both start date and end date.')
    ValidateDate(deny_maintenance_period_start_date)
    deny_maintenance_period.startDate = deny_maintenance_period_start_date

    ValidateDate(deny_maintenance_period_end_date)
    deny_maintenance_period.endDate = deny_maintenance_period_end_date

    if deny_maintenance_period_time:
      ValidTime(deny_maintenance_period_time)
      deny_maintenance_period.time = deny_maintenance_period_time

  return deny_maintenance_period


def ValidTime(s):
  try:
    datetime.datetime.strptime(s, '%H:%M:%S')
  except ValueError:
    raise argparse.ArgumentError(
        None, 'Invalid time value. The format should be HH:mm:SS.')


def ValidateDate(s):
  try:
    return datetime.datetime.strptime(s, '%Y-%m-%d')
  except ValueError:
    try:
      return datetime.datetime.strptime(s, '%m-%d')
    except ValueError:
      raise argparse.ArgumentError(
          None, 'Invalid date value. The format should be yyyy-mm-dd or mm-dd.')


def InsightsConfig(sql_messages,
                   insights_config_query_insights_enabled=None,
                   insights_config_query_string_length=None,
                   insights_config_record_application_tags=None,
                   insights_config_record_client_address=None,
                   insights_config_query_plans_per_minute=None):
  """Generates the insights config for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    insights_config_query_insights_enabled: boolean, True if query insights
      should be enabled.
    insights_config_query_string_length: number, length of the query string to
      be stored.
    insights_config_record_application_tags: boolean, True if application tags
      should be recorded.
    insights_config_record_client_address: boolean, True if client address
      should be recorded.
    insights_config_query_plans_per_minute: number, number of query plans to
      sample every minute.

  Returns:
    sql_messages.InsightsConfig or None
  """

  should_generate_config = any([
      insights_config_query_insights_enabled is not None,
      insights_config_query_string_length is not None,
      insights_config_record_application_tags is not None,
      insights_config_record_client_address is not None,
      insights_config_query_plans_per_minute is not None,
  ])
  if not should_generate_config:
    return None

  # Config exists, generate insights config.
  insights_config = sql_messages.InsightsConfig()
  if insights_config_query_insights_enabled is not None:
    insights_config.queryInsightsEnabled = (
        insights_config_query_insights_enabled)
  if insights_config_query_string_length is not None:
    insights_config.queryStringLength = insights_config_query_string_length
  if insights_config_record_application_tags is not None:
    insights_config.recordApplicationTags = (
        insights_config_record_application_tags)
  if insights_config_record_client_address is not None:
    insights_config.recordClientAddress = insights_config_record_client_address
  if insights_config_query_plans_per_minute is not None:
    insights_config.queryPlansPerMinute = insights_config_query_plans_per_minute

  return insights_config


def DbAlignedAtomicWritesConfig(sql_messages, db_aligned_atomic_writes=None):
  """Generates the db aligned atomic writes Config for the instance."""
  if db_aligned_atomic_writes is None:
    return None
  return sql_messages.DbAlignedAtomicWritesConfig(
      dbAlignedAtomicWrites=db_aligned_atomic_writes
  )


def ConnectionPoolConfig(
    sql_messages,
    enable_connection_pooling=None,
    connection_pool_flags=None,
    clear_connection_pool_flags=None,
    current_config=None,
):
  """Generates the connection pooling config for the instance."""

  # Skip generate new config if no config field is requested to be updated.
  if all([
      enable_connection_pooling is None,
      connection_pool_flags is None,
      clear_connection_pool_flags is None,
  ]):
    return None

  connection_pool_config = current_config or sql_messages.ConnectionPoolConfig()
  if enable_connection_pooling is not None:
    connection_pool_config.connectionPoolingEnabled = enable_connection_pooling

  if connection_pool_flags is not None:
    updated_flags = []
    for name, value in sorted(connection_pool_flags.items()):
      updated_flags.append(
          sql_messages.ConnectionPoolFlags(name=name, value=value)
      )
    connection_pool_config.flags = updated_flags
  elif clear_connection_pool_flags:
    connection_pool_config.flags = []

  return connection_pool_config


def ReadPoolAutoScaleConfig(
    sql_messages,
    auto_scale_enabled=None,
    auto_scale_min_node_count=None,
    auto_scale_max_node_count=None,
    auto_scale_target_metrics=None,
    auto_scale_disable_scale_in=None,
    auto_scale_in_cooldown_seconds=None,
    auto_scale_out_cooldown_seconds=None,
    current_config=None,
):
  """Generates the read pool auto-scale config for the instance."""
  if all([
      auto_scale_enabled is None,
      auto_scale_min_node_count is None,
      auto_scale_max_node_count is None,
      auto_scale_target_metrics is None,
      auto_scale_disable_scale_in is None,
      auto_scale_in_cooldown_seconds is None,
      auto_scale_out_cooldown_seconds is None,
  ]):
    return None
  read_pool_auto_scale_config = (
      current_config or sql_messages.ReadPoolAutoScaleConfig()
  )
  if auto_scale_enabled is not None:
    if auto_scale_enabled:
      read_pool_auto_scale_config.enabled = True
    else:
      # If auto-scale is disabled, clear the config.
      return sql_messages.ReadPoolAutoScaleConfig(enabled=False)
  if auto_scale_min_node_count is not None:
    read_pool_auto_scale_config.minNodeCount = auto_scale_min_node_count
  if auto_scale_max_node_count is not None:
    read_pool_auto_scale_config.maxNodeCount = auto_scale_max_node_count
  if auto_scale_target_metrics is not None:
    read_pool_auto_scale_config.targetMetrics = []
    for (
        metric,
        value,
    ) in auto_scale_target_metrics.items():
      read_pool_auto_scale_config.targetMetrics.append(
          sql_messages.TargetMetric(
              metric=metric,
              targetValue=value,
          )
      )
  if auto_scale_disable_scale_in is not None:
    read_pool_auto_scale_config.disableScaleIn = auto_scale_disable_scale_in
  if auto_scale_in_cooldown_seconds is not None:
    read_pool_auto_scale_config.scaleInCooldownSeconds = (
        auto_scale_in_cooldown_seconds
    )
  if auto_scale_out_cooldown_seconds is not None:
    read_pool_auto_scale_config.scaleOutCooldownSeconds = (
        auto_scale_out_cooldown_seconds
    )
  return read_pool_auto_scale_config


def _CustomMachineTypeString(cpu, memory_mib):
  """Creates a custom machine type from the CPU and memory specs.

  Args:
    cpu: the number of cpu desired for the custom machine type
    memory_mib: the amount of ram desired in MiB for the custom machine type
      instance

  Returns:
    The custom machine type name for the 'instance create' call
  """
  machine_type = 'db-custom-{0}-{1}'.format(cpu, memory_mib)
  return machine_type


def MachineType(instance=None, tier=None, memory=None, cpu=None):
  """Generates the machine type for the instance.

  Adapted from compute.

  Args:
    instance: sql_messages.DatabaseInstance, The original instance, if it might
      be needed to generate the machine type.
    tier: string, the v1 or v2 tier.
    memory: string, the amount of memory.
    cpu: int, the number of CPUs.

  Returns:
    A string representing the URL naming a machine-type.

  Raises:
    exceptions.RequiredArgumentException when only one of the two custom
        machine type flags are used, or when none of the flags are used.
    exceptions.InvalidArgumentException when both the tier and
        custom machine type flags are used to generate a new instance.
  """

  # Setting the machine type.
  machine_type = None
  if tier:
    machine_type = tier

  # Setting the specs for the custom machine.
  if cpu or memory:
    if not cpu:
      raise exceptions.RequiredArgumentException(
          '--cpu', 'Both [--cpu] and [--memory] must be '
          'set to create a custom machine type instance.')
    if not memory:
      raise exceptions.RequiredArgumentException(
          '--memory', 'Both [--cpu] and [--memory] must '
          'be set to create a custom machine type instance.')
    if tier:
      raise exceptions.InvalidArgumentException(
          '--tier', 'Cannot set both [--tier] and '
          '[--cpu]/[--memory] for the same instance.')
    custom_type_string = _CustomMachineTypeString(
        cpu,
        # Converting from B to MiB.
        memory // (2**20))

    # Updating the machine type that is set for the URIs.
    machine_type = custom_type_string

  # Reverting to default if creating instance and no flags are set.
  if not machine_type and not instance:
    machine_type = constants.DEFAULT_MACHINE_TYPE

  return machine_type


def OnPremisesConfiguration(sql_messages, source_ip_address, source_port):
  """Generates the external primary configuration for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    source_ip_address: string, the IP address of the external data source.
    source_port: number, the port number of the external data source.

  Returns:
    sql_messages.OnPremisesConfiguration object.
  """
  return sql_messages.OnPremisesConfiguration(
      kind='sql#onPremisesConfiguration',
      hostPort='{0}:{1}'.format(source_ip_address, source_port))


def PrivateNetworkUrl(network):
  """Generates the self-link of the instance's private network.

  Args:
    network: The ID of the network.

  Returns:
    string, the URL of the network.
  """
  client = common_api_util.SqlClient(common_api_util.API_VERSION_DEFAULT)
  network_ref = client.resource_parser.Parse(
      network,
      params={
          'project': properties.VALUES.core.project.GetOrFail,
      },
      collection='compute.networks')
  return network_ref.SelfLink()


def ReplicaConfiguration(sql_messages,
                         primary_username,
                         primary_password,
                         primary_dump_file_path,
                         primary_ca_certificate_path=None,
                         client_certificate_path=None,
                         client_key_path=None):
  """Generates the config for an external primary replica.

  Args:
    sql_messages: module, The messages module that should be used.
    primary_username: The username for connecting to the external instance.
    primary_password: The password for connecting to the external instance.
    primary_dump_file_path: ObjectReference, a wrapper for the URI of the Cloud
      Storage path containing the dumpfile to seed the replica with.
    primary_ca_certificate_path: The path to the CA certificate PEM file.
    client_certificate_path: The path to the client certificate PEM file.
    client_key_path: The path to the client private key PEM file.

  Returns:
    sql_messages.MySqlReplicaConfiguration object.
  """
  mysql_replica_configuration = sql_messages.MySqlReplicaConfiguration(
      kind='sql#mysqlReplicaConfiguration',
      username=primary_username,
      password=primary_password,
      dumpFilePath=primary_dump_file_path.ToUrl())
  if primary_ca_certificate_path:
    mysql_replica_configuration.caCertificate = files.ReadFileContents(
        primary_ca_certificate_path)
  if client_certificate_path:
    mysql_replica_configuration.clientCertificate = files.ReadFileContents(
        client_certificate_path)
  if client_key_path:
    mysql_replica_configuration.clientKey = files.ReadFileContents(
        client_key_path)
  return sql_messages.ReplicaConfiguration(
      kind='sql#demoteMasterMysqlReplicaConfiguration',
      mysqlReplicaConfiguration=mysql_replica_configuration)


def Region(specified_region, gce_zone, secondary_zone=None):
  """Generates the region string for the instance.

  Args:
    specified_region: string, the GCE region to create the instance in.
    gce_zone: string, the GCE zone to create the instance in.
    secondary_zone: string, the GCE zone to create the standby instance in.

  Returns:
    string, the region to create the instance in.
  """
  if gce_zone and secondary_zone:
    region_from_zone = api_util.GetRegionFromZone(gce_zone)
    region_from_secondary_zone = api_util.GetRegionFromZone(secondary_zone)
    if region_from_zone != region_from_secondary_zone:
      raise exceptions.ConflictingArgumentsException(
          'Zones in arguments --zone and --secondary-zone '
          'belong to different regions.')
  if gce_zone:
    derived_region = api_util.GetRegionFromZone(gce_zone)
    return derived_region
  return specified_region


def _ParseComplexity(sql_messages, complexity):
  if complexity:
    return sql_messages.PasswordValidationPolicy.ComplexityValueValuesEnum.lookup_by_name(
        complexity.upper())
  return None


def PasswordPolicy(
    sql_messages,
    password_policy_min_length=None,
    password_policy_complexity=None,
    password_policy_reuse_interval=None,
    password_policy_disallow_username_substring=None,
    password_policy_password_change_interval=None,
    enable_password_policy=None,
    clear_password_policy=None,
):
  """Generates or clears password policy for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    password_policy_min_length: int, Minimum number of characters allowed.
    password_policy_complexity: string, The complexity of the password.
    password_policy_reuse_interval: int, Number of previous passwords that
      cannot be reused.
    password_policy_disallow_username_substring: boolean, True if disallow
      username as a part of the password.
    password_policy_password_change_interval: duration, Minimum interval at
      which password can be changed.
    enable_password_policy: boolean, True if password validation policy is
      enabled.
    clear_password_policy: boolean, True if clear existing password policy.

  Returns:
    sql_messages.PasswordValidationPolicy or None
  """
  should_generate_policy = any([
      password_policy_min_length is not None,
      password_policy_complexity is not None,
      password_policy_reuse_interval is not None,
      password_policy_disallow_username_substring is not None,
      password_policy_password_change_interval is not None,
      enable_password_policy is not None,
  ])
  if not should_generate_policy or clear_password_policy:
    return None

  # Config exists, generate password policy.
  password_policy = sql_messages.PasswordValidationPolicy()

  if password_policy_min_length is not None:
    password_policy.minLength = password_policy_min_length
  if password_policy_complexity is not None:
    password_policy.complexity = _ParseComplexity(sql_messages,
                                                  password_policy_complexity)
  if password_policy_reuse_interval is not None:
    password_policy.reuseInterval = password_policy_reuse_interval
  if password_policy_disallow_username_substring is not None:
    password_policy.disallowUsernameSubstring = password_policy_disallow_username_substring
  if password_policy_password_change_interval is not None:
    password_policy.passwordChangeInterval = str(
        password_policy_password_change_interval) + 's'
  if enable_password_policy is not None:
    password_policy.enablePasswordPolicy = enable_password_policy

  return password_policy


def PscAutoConnections(
    sql_messages,
    psc_auto_connections=None,
):
  """Generates PSC auto connections for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    psc_auto_connections: dict of the allowed consumer projects and networks.

  Returns:
    list of sql_messages.PscAutoConnectionConfig objects

  Raises:
    exceptions.InvalidArgumentException when there is no valid network or
    project specified.
  """
  updated_psc_auto_connections = []
  for connection in psc_auto_connections:
    current_psc_auto_connection = sql_messages.PscAutoConnectionConfig()
    current_psc_auto_connection.consumerNetwork = connection.get('network')

    if project := connection.get('project'):
      current_psc_auto_connection.consumerProject = project
    else:
      client = common_api_util.SqlClient(common_api_util.API_VERSION_DEFAULT)
      network_ref = client.resource_parser.ParseRelativeName(
          current_psc_auto_connection.consumerNetwork,
          collection='compute.networks',
      )
      current_psc_auto_connection.consumerProject = network_ref.project

    if (
        current_psc_auto_connection.consumerProject
        and current_psc_auto_connection.consumerNetwork
    ):
      updated_psc_auto_connections.append(current_psc_auto_connection)
    else:
      raise exceptions.InvalidArgumentException(
          '--psc-auto-connections', 'PSC auto connection must have network '
          'specified.'
      )
  return updated_psc_auto_connections


def UncMappings(sql_messages, unc_mappings=None, clear_unc_mappings=False):
  """Generates the database flags for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    unc_mappings: input unc mappings from the user.
    clear_unc_mappings: boolean, True if unc mappings should be cleared.

  Returns:
    list of sql_messages.UncMapping objects
  """
  updated_mappings = []
  if unc_mappings:
    for mapping_str in unc_mappings:
      try:
        rest, mode_str = mapping_str.rsplit(':', 1)
        unc_path, gcs_path = rest.split('=', 1)
        updated_mappings.append(
            sql_messages.UncMapping(
                uncPath=unc_path,
                gcsPath=gcs_path,
                mode=sql_messages.UncMapping.ModeValueValuesEnum.lookup_by_name(
                    mode_str.upper()
                ),
            )
        )
      except ValueError as e:
        raise ValueError(
            "Invalid UNC mapping input. Expected 'unc-path=gcs-path:mode'."
        ) from e
      except KeyError as e:
        raise ValueError(
            "Invalid mode. Expected 'snapshot_read' or 'snapshot_write'."
        ) from e
  elif clear_unc_mappings:
    updated_mappings = []

  return updated_mappings


def FinalBackupConfiguration(
    sql_messages,
    instance=None,
    final_backup_enabled=None,
    final_backup_retention_days=None,
):
  """Generates the Final Backup configuration for the instance.

  Args:
    sql_messages: module, The messages module that should be used.
    instance: sql_messages.DatabaseInstance, the original instance, if the
      previous state is needed.
    final_backup_enabled: boolean, True if final backup should be enabled.
    final_backup_retention_days: int, how many days to retain the final backup.

  Returns:
    sql_messages.FinalBackupConfiguration object, or None

  Raises:
    sql_exceptions.ArgumentError: Bad combination of arguments.
  """
  should_generate_config = any([
      final_backup_enabled is not None,
      final_backup_retention_days is not None,
  ])

  if not should_generate_config:
    return None

  # final_backup_enabled is explicitly set to False.
  # final_backup_retention_days should not be set using gcloud.
  if final_backup_enabled is not None and not final_backup_enabled:
    if final_backup_retention_days is not None:
      raise sql_exceptions.ArgumentError(
          'You cannot set final-backup-retention-days while final-backup field is disabled.'
      )

  if not instance or not instance.settings.finalBackupConfig:
    final_backup_config = sql_messages.FinalBackupConfig()
  else:
    final_backup_config = instance.settings.finalBackupConfig

  # Generate new final backup config based on the gcloud arguments.
  if final_backup_enabled is not None:
    final_backup_config.enabled = final_backup_enabled
  if final_backup_retention_days is not None:
    final_backup_config.retentionDays = final_backup_retention_days
    # Final backup enabled set to true if retention days specified.
    final_backup_config.enabled = True

  # final_backup_enabled is set to False, we need to cleanup the retention days.
  if final_backup_enabled is not None and not final_backup_enabled:
    final_backup_config.retentionDays = None

  return final_backup_config