File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/managed_kafka/util.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library used to support Managed Service for Apache Kafka commands."""
import re
from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk import core
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
# Retrieve all message type for conversions from gcloud primitives to
# apitool types.
_MESSAGE = apis.GetMessagesModule("managedkafka", "v1")
SASL_PORT = "9092"
MTLS_PORT = "9192"
CONTEXTS_RESOURCE_PATH = "/contexts/"
SUBJECTS_RESOURCE_PATH = "/subjects/"
SUBJECTS_MODE_RESOURCE_PATH = "/mode/"
SUBJECTS_CONFIG_RESOURCE_PATH = "/config/"
def ValidateCPU(cpu):
"""Validate CPU >= 3."""
if cpu < 3:
raise exceptions.BadArgumentException("--cpu", "CPU must be at least 3")
return cpu
def PrepareUpdateWithSubnets(_, args, request):
"""Prepare the update request with the information from the subnet flag.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with the subnet.
"""
if not args.subnets:
return request
# The cluster is not created yet if only the subnet flag is set. This is
# because we don't formally map the subnet to the request the same way we do
# for other flags. Instead, subnets require special handling with the use of
# hooks.
if not request.cluster:
request.cluster = {}
subnet_update_mask = "gcpConfig.accessConfig.networkConfigs"
request.updateMask = AppendUpdateMask(request.updateMask, subnet_update_mask)
return MapSubnetsToNetworkConfig(_, args, request)
def PrepareUpdateWithCaPools(_, args, request):
"""Prepare the update request with the information from the mTLS CA pool flag.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with the CA pool.
"""
# If the user is clearing the CA pools, we need to add the update mask.
# This flag is guarded by a mutex and will not conflict with the ca pool flag.
if args.clear_mtls_ca_pools:
request.updateMask = AppendUpdateMask(
request.updateMask, "tlsConfig.trustConfig.casConfigs"
)
return request
# If the there are no CA pools to update, return the request as is.
if not args.mtls_ca_pools:
return request
# The cluster is not created yet if only the CA pool flag is set. This is
# because we don't formally map the CA pool to the request the same way we do
# for other flags. Instead, CA pools require special handling with the use of
# hooks.
if not request.cluster:
request.cluster = {}
ca_pool_update_mask = "tlsConfig.trustConfig.casConfigs"
request.updateMask = AppendUpdateMask(request.updateMask, ca_pool_update_mask)
return MapCaPoolsToCASConfig(_, args, request)
def AppendUpdateMask(update_mask, new_mask):
"""Handles appending a new mask to an existing mask.
Args:
update_mask: the existing update mask.
new_mask: the new mask to append.
Returns:
The fully appended update mask.
"""
update_mask = f"{update_mask},{new_mask}"
return update_mask if update_mask[0] != "," else update_mask[1:]
def MapSubnetsToNetworkConfig(_, args, request):
"""Maps the list of subnets from the flag to the API fields in the request.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with networkConfig in the JSON format.
"""
# Reference the existing GCP config if already created for the request.
if not request.cluster.gcpConfig:
request.cluster.gcpConfig = {}
request.cluster.gcpConfig.accessConfig = {"networkConfigs": []}
for subnet in args.subnets:
network_config = {"subnet": subnet}
request.cluster.gcpConfig.accessConfig.networkConfigs.append(
encoding.DictToMessage(network_config, _MESSAGE.NetworkConfig)
)
return request
def MapCaPoolsToCASConfig(_, args, request):
"""Maps the list of CA pools from the flag to the API fields in the request.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with CertificateAuthorityServiceConfig in the JSON
format.
"""
if not args.mtls_ca_pools:
return request
# Reference the existing CAS config if already created for the request.
if not request.cluster.tlsConfig:
request.cluster.tlsConfig = {}
request.cluster.tlsConfig.trustConfig = {"casConfigs": []}
for ca_pool in args.mtls_ca_pools:
cas_config = {"caPool": ca_pool}
request.cluster.tlsConfig.trustConfig.casConfigs.append(
encoding.DictToMessage(
cas_config, _MESSAGE.CertificateAuthorityServiceConfig
)
)
return request
def ListWithBootstrapAddr(response, _):
"""Synthesizes the bootstrap address to the response for a list request.
Args:
response: the payload to return.
_: list of flags.
Returns:
The updated clusters with the bootstrap.
"""
return [
SynthesizeBootstrapAddr(cluster, cluster.name) for cluster in response
]
def DescribeWithBootstrapAddr(response, _):
"""Synthesizes the bootstrap address to the response for a describe request.
Args:
response: the payload to return.
_: list of flags.
Returns:
The updated cluster with the bootstrap.
"""
return SynthesizeBootstrapAddr(response, response.name)
def SynthesizeBootstrapAddr(response, cluster):
"""Synthesizes the bootstrap address to the response.
Args:
response: the payload to update.
cluster: the fully qualifed name of the cluster.
Returns:
The updated cluster with the bootstrap
"""
# The fully qualified name will always be consistent. We also have to use the
# fully qualifed name instead of the resource directly to support both
# `describe` and `list`.
name = cluster.split("/")[5]
location = cluster.split("/")[3]
project = core.properties.VALUES.core.project.Get()
domain_prefixed_project = project.split(":")
if len(domain_prefixed_project) == 2:
project = f"{domain_prefixed_project[1]}.{domain_prefixed_project[0]}"
bootstrap = f"bootstrap.{name}.{location}.managedkafka.{project}.cloud.goog"
synthesized = core.resource.resource_projector.MakeSerializable(response)
synthesized["bootstrapAddress"] = f"{bootstrap}:{SASL_PORT}"
if hasattr(response, "tlsConfig") and response.tlsConfig:
synthesized["bootstrapAddressMTLS"] = f"{bootstrap}:{MTLS_PORT}"
return synthesized
def UpdateTopics(_, args, request):
"""Load the topics JSON from the argument to the request.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with topics.
"""
topics = core.yaml.load(args.topics_file)
request.consumerGroup = {
"topics": encoding.DictToMessage(
topics, _MESSAGE.ConsumerGroup.TopicsValue
)
}
request.updateMask = "topics"
return request
def MapConnectParamsToNetworkConfig(_, args, request):
"""Maps subnets and DNS names to the network config API field.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The updated request with networkConfig in the JSON format.
"""
# If no network config flags are provided (such as in the case of an update),
# we don't need to create a network config.
if not args.primary_subnet and not args.dns_name:
return request
# Reference the existing GCP config if already created for the request.
if not request.connectCluster.gcpConfig:
request.connectCluster.gcpConfig = {}
request.connectCluster.gcpConfig.accessConfig = {"networkConfigs": []}
# Subnets may not be provided during update
if not args.primary_subnet:
network_config = {"dns_domain_names": []}
else:
network_config = {
"primary_subnet": args.primary_subnet,
"additional_subnets": [],
"dns_domain_names": [],
}
if not args.additional_subnet:
args.additional_subnet = []
network_config["additional_subnets"] = list(args.additional_subnet)
if not args.dns_name:
args.dns_name = []
network_config["dns_domain_names"] = list(args.dns_name)
request.connectCluster.gcpConfig.accessConfig.networkConfigs.append(
encoding.DictToMessage(network_config, _MESSAGE.ConnectNetworkConfig)
)
if isinstance(
# (if the request is an update)
request,
_MESSAGE.ManagedkafkaProjectsLocationsConnectClustersPatchRequest,
):
request.updateMask = re.sub(
r"gcpConfig\.accessConfig\.networkConfigs\.dnsDomainNames",
"gcpConfig.accessConfig.networkConfigs",
request.updateMask,
)
request.updateMask = re.sub(
r"gcpConfig\.accessConfig\.networkConfigs\.primarySubnet",
"gcpConfig.accessConfig.networkConfigs",
request.updateMask,
)
request.updateMask = re.sub(
r"gcpConfig\.accessConfig\.networkConfigs\.additionalSubnets",
"gcpConfig.accessConfig.networkConfigs",
request.updateMask,
)
return request
def PrepareConnectClusterCreate(_, args, request):
"""Load the config JSON from the argument to the request and build the kafka cluster resource path.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
"""
if args.config_file:
config = core.yaml.load(args.config_file)
request.connectCluster.config = encoding.DictToMessage(
config, _MESSAGE.ConnectCluster.ConfigValue
)
project = args.project or core.properties.VALUES.core.project.Get()
# If the user provides the full path, we don't need to build it.
kafka_cluster_path = args.kafka_cluster
if not re.match(r"projects/.+/locations/.+/clusters/.+", args.kafka_cluster):
location = args.location or args.connect_cluster.split("/")[3]
kafka_cluster_path = (
f"projects/{project}/locations/{location}/clusters/{args.kafka_cluster}"
)
request.connectCluster.kafkaCluster = kafka_cluster_path
return request
def PrepareConnectClusterUpdate(_, args, request):
"""Map the update flags to the request and update mask.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
"""
if args.config_file:
config = core.yaml.load(args.config_file)
request.connectCluster.config = encoding.DictToMessage(
config, _MESSAGE.ConnectCluster.ConfigValue
)
request.updateMask = AppendUpdateMask(request.updateMask, "config")
if args.clear_configs:
request.updateMask = AppendUpdateMask(request.updateMask, "config")
if args.clear_dns_names:
request.updateMask = AppendUpdateMask(
request.updateMask,
"gcpConfig.accessConfig.networkConfigs.dnsDomainNames",
)
if args.clear_secrets:
request.updateMask = AppendUpdateMask(
request.updateMask, "gcpConfig.secretPaths"
)
if args.clear_labels:
request.updateMask = AppendUpdateMask(request.updateMask, "labels")
return request
def ConnectorCreateReadConfigAndTaskRestartPolicy(_, args, request):
"""Load the config JSON from the argument to the request.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
"""
if not request.connector:
request.connector = {}
if args.config_file:
config = core.yaml.load(args.config_file)
request.connector.configs = encoding.DictToMessage(
config, _MESSAGE.Connector.ConfigsValue
)
# Gcloud's duration formatting doesn't seem to play nice with the protobuf
# duration parsing, so we convert to seconds and append the unit suffix here.
task_restart_policy_dict = {}
if args.task_restart_max_backoff:
task_restart_policy_dict["maximum_backoff"] = (
str(args.task_restart_max_backoff.total_seconds) + "s"
)
if args.task_restart_min_backoff:
task_restart_policy_dict["minimum_backoff"] = (
str(args.task_restart_min_backoff.total_seconds) + "s"
)
if task_restart_policy_dict:
request.connector.taskRestartPolicy = encoding.DictToMessage(
task_restart_policy_dict,
_MESSAGE.TaskRetryPolicy,
)
return request
def ConnectorUpdateReadConfigAndTaskRestartPolicy(_, args, request):
"""Load the config JSON from the argument to the request, and parse out the task restart policy.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
"""
if not request.connector:
request.connector = {}
if args.config_file:
config = core.yaml.load(args.config_file)
request.connector.configs = encoding.DictToMessage(
config, _MESSAGE.Connector.ConfigsValue
)
request.updateMask = AppendUpdateMask(request.updateMask, "configs")
# Gcloud's duration formatting doesn't seem to play nice with the protobuf
# duration parsing, so we convert to seconds and append the unit suffix here.
task_restart_policy_dict = {}
if args.task_restart_max_backoff:
task_restart_policy_dict["maximum_backoff"] = (
str(args.task_restart_max_backoff.total_seconds) + "s"
)
if args.task_restart_min_backoff:
task_restart_policy_dict["minimum_backoff"] = (
str(args.task_restart_min_backoff.total_seconds) + "s"
)
if task_restart_policy_dict:
request.connector.taskRestartPolicy = encoding.DictToMessage(
task_restart_policy_dict,
_MESSAGE.TaskRetryPolicy,
)
return request
def PatchConfigs(_, args, request):
"""Unnest the configs dictionary to the update mask.
Args:
_: resource parameter required but unused variable.
args: list of flags.
request: the payload to return.
Returns:
The new update mask with the configs.
"""
if args.configs:
update_mask = request.updateMask.split(",")
update_mask.remove("configs")
configs_list = []
for key, _ in args.configs.items():
configs_list.append(f'configs["{key}"]')
request.updateMask = AppendUpdateMask(
",".join(update_mask), ",".join(configs_list)
)
# This flag is guarded with a mutex so it won't conflict with the above.
if args.clear_configs:
request.updateMask = AppendUpdateMask(request.updateMask, "configs")
return request
def ParseMode(mode) -> str:
"""Parse the mode enum to a string.
Args:
mode: The mode enum of the schema registry or subject.
Returns:
The mode string.
"""
if mode == _MESSAGE.SchemaMode.ModeValueValuesEnum.READWRITE:
return "READWRITE"
elif mode == _MESSAGE.SchemaMode.ModeValueValuesEnum.READONLY:
return "READONLY"
elif mode == _MESSAGE.SchemaMode.ModeValueValuesEnum.IMPORT:
return "IMPORT"
else:
return "NONE"
def ParseCompatibility(compatibility) -> str:
"""Parse the compatibility enum to a string.
Args:
compatibility: The compatibility enum of the schema registry or subject.
Returns:
The compatibility string.
"""
if (
compatibility
== _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.BACKWARD
):
return "BACKWARD"
elif (
compatibility
== _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.BACKWARD_TRANSITIVE
):
return "BACKWARD_TRANSITIVE"
elif (
compatibility
== _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.FORWARD
):
return "FORWARD"
elif (
compatibility
== _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.FORWARD_TRANSITIVE
):
return "FORWARD_TRANSITIVE"
elif compatibility == _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.FULL:
return "FULL"
elif (
compatibility
== _MESSAGE.SchemaConfig.CompatibilityValueValuesEnum.FULL_TRANSITIVE
):
return "FULL_TRANSITIVE"
else:
return "NONE"
def ParseProject(project_id=None):
return project_id or properties.VALUES.core.project.Get(required=True)
def DeleteSubjectMode(subject, subject_run_resource, context):
"""Called when the user runs gcloud managed-kafka schema-registries subject delete ...
Args:
subject: The subject of the attribute to delete.
subject_run_resource: The subject resource path.
context: The context of the schema registry if provided.
Returns:
The updated subject with its mode deleted.
"""
message = apis.GetMessagesModule("managedkafka", "v1")
client = apis.GetClientInstance("managedkafka", "v1")
schema_registry_resource = subject_run_resource
schema_registry_resource = (
f"{schema_registry_resource}{SUBJECTS_MODE_RESOURCE_PATH}{subject}"
)
# Check if context is provided.
if context:
log.status.Print("Deleting subject mode for [%s]." % subject)
request = message.ManagedkafkaProjectsLocationsSchemaRegistriesContextsModeDeleteRequest(
name=schema_registry_resource
)
try:
client.projects_locations_schemaRegistries_contexts_mode.Delete(
request=request
)
log.UpdatedResource(subject, details="mode. It is now unset.")
except apitools_exceptions.HttpNotFoundError as e:
api_error = exceptions.HttpException(e, HTTP_ERROR_FORMAT)
log.status.Print(api_error.message)
if "Resource not found" in api_error.message:
raise exceptions.HttpException(
e, error_format="Subject {} not found.".format(subject)
)
else:
log.status.Print("Deleting subject mode for [%s]." % subject)
request = (
message.ManagedkafkaProjectsLocationsSchemaRegistriesModeDeleteRequest(
name=schema_registry_resource
)
)
try:
client.projects_locations_schemaRegistries_mode.Delete(request=request)
log.UpdatedResource(subject, details="mode. It is now unset.")
except apitools_exceptions.HttpNotFoundError as e:
api_error = exceptions.HttpException(e, HTTP_ERROR_FORMAT)
log.status.Print(api_error.message)
if "Resource not found" in api_error.message:
raise exceptions.HttpException(
e, error_format="Subject {} not found.".format(subject)
)
def DeleteSubjectConfig(subject, schema_registry_resource, context):
"""Called when the user runs gcloud managed-kafka schema-registries subject delete ...
Args:
subject: The subject of the attribute to delete.
schema_registry_resource: The schema registry resource path.
context: The context of the schema registry if provided.
Returns:
The updated subject with its config deleted.
"""
message = apis.GetMessagesModule("managedkafka", "v1")
client = apis.GetClientInstance("managedkafka", "v1")
name = f"{schema_registry_resource}{SUBJECTS_CONFIG_RESOURCE_PATH}{subject}"
# Check if context is provided.
if context:
log.status.Print("Deleting subject config for [%s]." % subject)
request = message.ManagedkafkaProjectsLocationsSchemaRegistriesContextsConfigDeleteRequest(
name=name
)
try:
client.projects_locations_schemaRegistries_contexts_config.Delete(
request=request
)
log.UpdatedResource(subject, details="config. It is now unset.")
except apitools_exceptions.HttpNotFoundError as e:
api_error = exceptions.HttpException(e, HTTP_ERROR_FORMAT)
log.status.Print(api_error.message)
if "Resource not found" in api_error.message:
raise exceptions.HttpException(
e, error_format="Subject {} not found.".format(subject)
)
else:
log.status.Print("Deleting subject config for [%s]." % subject)
request = message.ManagedkafkaProjectsLocationsSchemaRegistriesConfigDeleteRequest(
name=name
)
try:
client.projects_locations_schemaRegistries_config.Delete(request=request)
log.UpdatedResource(subject, details="config. It is now unset.")
except apitools_exceptions.HttpNotFoundError as e:
api_error = exceptions.HttpException(e, HTTP_ERROR_FORMAT)
log.status.Print(api_error.message)
if "Resource not found" in api_error.message:
raise exceptions.HttpException(
e, error_format="Subject {} not found.".format(subject)
)
HTTP_ERROR_FORMAT = (
"ResponseError: code={status_code}, message={status_message}"
)