HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/bigtable/authorized_views.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Bigtable authorized views API helper."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64
import binascii
import copy
import io
import json
import textwrap

from apitools.base.py import encoding
from apitools.base.py import exceptions as api_exceptions
from googlecloudsdk.api_lib.bigtable import util
from googlecloudsdk.api_lib.util import exceptions
from googlecloudsdk.calliope import exceptions as calliope_exceptions
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.resource import resource_diff
from googlecloudsdk.core.util import edit
import six

CREATE_HELP = textwrap.dedent("""\
    To create an authorized view, specify a JSON or YAML formatted
    representation of a valid authorized view protobuf.
    Lines beginning with "#" are ignored.

    Example:
    {
      "subsetView":
      {
        "rowPrefixes": ["store1#"],
        "familySubsets":
        {
          "column_family_name":
          {
            "qualifiers":["address"],
            "qualifierPrefixes":["tel"]
          }
        }
      },
      "deletionProtection": true
    }
  """)

UPDATE_HELP = textwrap.dedent("""\
    Please pecify a JSON or YAML formatted representation of the new authorized
    view. Lines beginning with "#" are ignored.

    Example:
    {
      "subsetView":
      {
        "rowPrefixes": ["store1#"],
        "familySubsets":
        {
          "column_family_name":
          {
            "qualifiers":["address"],
            "qualifierPrefixes":["tel"]
          }
        }
      },
      "deletionProtection": true
    }

    Current authorized view:
  """)


def ModifyCreateAuthorizedViewRequest(unused_ref, args, req):
  """Parse argument and construct create authorized view request.

  Args:
    unused_ref: the gcloud resource (unused).
    args: input arguments.
    req: the real request to be sent to backend service.

  Returns:
    The real request to be sent to backend service.
  """
  if args.definition_file:
    req.authorizedView = ParseAuthorizedViewFromYamlOrJsonDefinitionFile(
        args.definition_file, args.pre_encoded
    )
  else:
    # If no definition_file is provided, EDITOR will be executed with a
    # commented prompt for the user to fill out the authorized view definition.
    req.authorizedView = PromptForAuthorizedViewDefinition(
        is_create=True, pre_encoded=args.pre_encoded
    )

  # The name field should be ignored and omitted from the request as it is
  # taken from the command line.
  req.authorizedView.name = None

  # By specifying the request_id_field for the authorized view resource in the
  # declarative yaml file, the req.authorizedViewId and the req.parent will be
  # automatically mapped.

  return req


def PromptForAuthorizedViewDefinition(
    is_create, pre_encoded, current_authorized_view=None
):
  """Prompt user to fill out a JSON/YAML format representation of an authorized view.

  Returns the parsed authorized view proto message from user's response.

  Args:
    is_create: True if the prompt is for creating an authorized view. False if
      the prompt is for updating an authorized view.
    pre_encoded: True if all binary fields in the authorized view definition are
      already Base64-encoded. We skip the step of applying Base64 encoding in
      this case.
    current_authorized_view: The current authorized view definition. Only used
      in the update case to be included as part of the initial commented prompt.

  Returns:
    an authorized view proto message with fields filled accordingly.

  Raises:
    ChildProcessError if the user did not save the temporary file.
    ChildProcessError if there is a problem running the editor.
    ValueError if the user's response does not follow YAML or JSON format.
    ValueError if the YAML/JSON object cannot be parsed as a valid authorized
      View.
  """
  authorized_view_message_type = util.GetAdminMessages().AuthorizedView
  if is_create:
    help_text = BuildCreateAuthorizedViewFileContents()
  else:
    help_text = BuildUpdateAuthorizedViewFileContents(
        current_authorized_view, pre_encoded
    )
  try:
    content = edit.OnlineEdit(help_text)
  except edit.NoSaveException:
    raise ChildProcessError("Edit aborted by user.")
  except edit.EditorException as e:
    raise ChildProcessError(
        "There was a problem applying your changes. [{0}].".format(e)
    )
  try:
    authorized_view_to_parse = yaml.load(content)
    if not pre_encoded:
      Base64EncodingYamlAuthorizedViewDefinition(authorized_view_to_parse)
    authorized_view = encoding.PyValueToMessage(
        authorized_view_message_type, authorized_view_to_parse
    )
  except yaml.YAMLParseError as e:
    raise ValueError(
        "Provided response is not a properly formatted YAML or JSON file."
        " [{0}].".format(e)
    )
  except AttributeError as e:
    raise ValueError(
        "Provided response cannot be parsed as a valid authorized view. [{0}]."
        .format(e)
    )
  return authorized_view


def ParseAuthorizedViewFromYamlOrJsonDefinitionFile(file_path, pre_encoded):
  """Create an authorized view proto message from a YAML or JSON formatted definition file.

  Args:
    file_path: Path to the YAML or JSON definition file.
    pre_encoded: True if all binary fields in the authorized view definition are
      already Base64-encoded. We skip the step of applying Base64 encoding in
      this case.

  Returns:
    an authorized view proto message with fields filled accordingly.

  Raises:
    BadArgumentException if the file cannot be read.
    BadArgumentException if the file does not follow YAML or JSON format.
    ValueError if the YAML/JSON object cannot be parsed as a valid authorized
      view.
  """
  authorized_view_message_type = util.GetAdminMessages().AuthorizedView
  try:
    authorized_view_to_parse = yaml.load_path(file_path)
    if not pre_encoded:
      Base64EncodingYamlAuthorizedViewDefinition(authorized_view_to_parse)
    authorized_view = encoding.PyValueToMessage(
        authorized_view_message_type, authorized_view_to_parse
    )
  except (yaml.FileLoadError, yaml.YAMLParseError) as e:
    raise calliope_exceptions.BadArgumentException("--definition-file", e)
  except AttributeError as e:
    raise ValueError(
        "File [{0}] cannot be parsed as a valid authorized view. [{1}].".format(
            file_path, e
        )
    )
  return authorized_view


def Base64EncodingYamlAuthorizedViewDefinition(yaml_authorized_view):
  """Apply base64 encoding to all binary fields in the authorized view definition in YAML format."""
  if not yaml_authorized_view or "subsetView" not in yaml_authorized_view:
    return yaml_authorized_view
  yaml_subset_view = yaml_authorized_view["subsetView"]

  if "rowPrefixes" in yaml_subset_view:
    yaml_subset_view["rowPrefixes"] = [
        Utf8ToBase64(s) for s in yaml_subset_view.get("rowPrefixes", [])
    ]
  if "familySubsets" in yaml_subset_view:
    for family_name, family_subset in yaml_subset_view["familySubsets"].items():
      if "qualifiers" in family_subset:
        family_subset["qualifiers"] = [
            Utf8ToBase64(s) for s in family_subset.get("qualifiers", [])
        ]
      if "qualifierPrefixes" in family_subset:
        family_subset["qualifierPrefixes"] = [
            Utf8ToBase64(s) for s in family_subset.get("qualifierPrefixes", [])
        ]
      yaml_subset_view["familySubsets"][family_name] = family_subset
  return yaml_authorized_view


def Base64DecodingYamlAuthorizedViewDefinition(yaml_authorized_view):
  """Apply base64 decoding to all binary fields in the authorized view definition in YAML format."""
  if not yaml_authorized_view or "subsetView" not in yaml_authorized_view:
    return yaml_authorized_view
  yaml_subset_view = yaml_authorized_view["subsetView"]

  if "rowPrefixes" in yaml_subset_view:
    yaml_subset_view["rowPrefixes"] = [
        Base64ToUtf8(s) for s in yaml_subset_view.get("rowPrefixes", [])
    ]
  if "familySubsets" in yaml_subset_view:
    for family_name, family_subset in yaml_subset_view["familySubsets"].items():
      if "qualifiers" in family_subset:
        family_subset["qualifiers"] = [
            Base64ToUtf8(s) for s in family_subset.get("qualifiers", [])
        ]
      if "qualifierPrefixes" in family_subset:
        family_subset["qualifierPrefixes"] = [
            Base64ToUtf8(s) for s in family_subset.get("qualifierPrefixes", [])
        ]
      yaml_subset_view["familySubsets"][family_name] = family_subset
  return yaml_authorized_view


def Utf8ToBase64(s):
  """Encode a utf-8 string as a base64 string."""
  return six.ensure_text(base64.b64encode(six.ensure_binary(s)))


def Base64ToUtf8(s):
  """Decode a base64 string as a utf-8 string."""
  try:
    return six.ensure_text(base64.b64decode(s))
  except (TypeError, binascii.Error) as error:
    raise ValueError(
        "Error decoding base64 string [{0}] in the current authorized view"
        " definition into utf-8. [{1}].".format(s, error)
    )


def CheckOnlyAsciiCharactersInAuthorizedView(authorized_view):
  """Raises a ValueError if the view contains non-ascii characters."""
  if authorized_view is None or authorized_view.subsetView is None:
    return
  subset_view = authorized_view.subsetView

  if subset_view.rowPrefixes is not None:
    for row_prefix in subset_view.rowPrefixes:
      CheckAscii(row_prefix)

  if subset_view.familySubsets is not None:
    for additional_property in subset_view.familySubsets.additionalProperties:
      family_subset = additional_property.value
      for qualifier in family_subset.qualifiers:
        CheckAscii(qualifier)
      for qualifier_prefix in family_subset.qualifierPrefixes:
        CheckAscii(qualifier_prefix)


def CheckAscii(s):
  """Check if a string is ascii."""
  try:
    s.decode("ascii")
  except UnicodeError as error:
    raise ValueError(
        "Non-ascii characters [{0}] found in the current authorized view"
        " definition, please use --pre-encoded instead. [{1}].".format(s, error)
    )


def BuildCreateAuthorizedViewFileContents():
  """Builds the help text for creating an authorized view as the initial file content."""
  buf = io.StringIO()
  for line in CREATE_HELP.splitlines():
    buf.write("#")
    if line:
      buf.write(" ")
    buf.write(line)
    buf.write("\n")
  buf.write("\n")
  return buf.getvalue()


def ModifyUpdateAuthorizedViewRequest(original_ref, args, req):
  """Parse argument and construct update authorized view request.

  Args:
    original_ref: the gcloud resource.
    args: input arguments.
    req: the real request to be sent to backend service.

  Returns:
    The real request to be sent to backend service.
  """
  current_authorized_view = None
  if args.definition_file:
    req.authorizedView = ParseAuthorizedViewFromYamlOrJsonDefinitionFile(
        args.definition_file, args.pre_encoded
    )
  else:
    # If no definition_file is provided, EDITOR will be executed with a
    # commented prompt for the user to fill out the authorized view definition.
    current_authorized_view = GetCurrentAuthorizedView(
        original_ref.RelativeName(), not args.pre_encoded
    )
    req.authorizedView = PromptForAuthorizedViewDefinition(
        is_create=False,
        pre_encoded=args.pre_encoded,
        current_authorized_view=current_authorized_view,
    )

  if req.authorizedView.subsetView is not None:
    req = AddFieldToUpdateMask("subset_view", req)
  if req.authorizedView.deletionProtection is not None:
    req = AddFieldToUpdateMask("deletion_protection", req)

  if args.interactive:
    if current_authorized_view is None:
      current_authorized_view = GetCurrentAuthorizedView(
          original_ref.RelativeName(), check_ascii=False
      )

    # This essentially merges the requested authorized view to the original
    # authorized view based on the update mask.
    new_authorized_view = copy.deepcopy(current_authorized_view)
    if req.authorizedView.subsetView is not None:
      new_authorized_view.subsetView = req.authorizedView.subsetView
    if req.authorizedView.deletionProtection is not None:
      new_authorized_view.deletionProtection = (
          req.authorizedView.deletionProtection
      )

    # Get the diff between the original authorized view and the new authorized
    # view.
    buf = io.StringIO()
    differ = resource_diff.ResourceDiff(
        original=current_authorized_view, changed=new_authorized_view
    )
    differ.Print("default", out=buf)
    if buf.getvalue():
      console_io.PromptContinue(
          message=(
              "Difference between the current authorized view and the new"
              " authorized view:\n"
          )
          + buf.getvalue(),
          cancel_on_no=True,
      )
    else:
      console_io.PromptContinue(
          message="The authorized view will NOT change with this update.",
          cancel_on_no=True,
      )

  # The name field should be ignored and omitted from the request as it is
  # taken from the command line.
  req.authorizedView.name = None
  if args.ignore_warnings:
    req.ignoreWarnings = True

  return req


def GetCurrentAuthorizedView(authorized_view_name, check_ascii):
  """Get the current authorized view resource object given the authorized view name.

  Args:
    authorized_view_name: The name of the authorized view.
    check_ascii: True if we should check to make sure that the returned
      authorized view contains only ascii characters.

  Returns:
    The view resource object.

  Raises:
    ValueError if check_ascii is true and the current authorized view definition
    contains invalid non-ascii characters.
  """
  client = util.GetAdminClient()
  request = util.GetAdminMessages().BigtableadminProjectsInstancesTablesAuthorizedViewsGetRequest(
      name=authorized_view_name
  )
  try:
    authorized_view = client.projects_instances_tables_authorizedViews.Get(
        request
    )
    if check_ascii:
      CheckOnlyAsciiCharactersInAuthorizedView(authorized_view)
    return authorized_view
  except api_exceptions.HttpError as error:
    raise exceptions.HttpException(error)


def SerializeToJsonOrYaml(
    authorized_view, pre_encoded, serialized_format="json"
):
  """Serializes a authorized view protobuf to either JSON or YAML."""
  authorized_view_dict = encoding.MessageToDict(authorized_view)
  if not pre_encoded:
    authorized_view_dict = Base64DecodingYamlAuthorizedViewDefinition(
        authorized_view_dict
    )
  if serialized_format == "json":
    return six.text_type(json.dumps(authorized_view_dict, indent=2))
  if serialized_format == "yaml":
    return six.text_type(yaml.dump(authorized_view_dict))


def BuildUpdateAuthorizedViewFileContents(current_authorized_view, pre_encoded):
  """Builds the help text for updating an existing authorized view.

  Args:
    current_authorized_view: The current authorized view resource object.
    pre_encoded: When pre_encoded is False, user is passing utf-8 values for
      binary fields in the authorized view definition and expecting us to apply
      base64 encoding. Thus, we should also display utf-8 values in the help
      text, which requires base64 decoding the binary fields in the
      `current_authorized_view`.

  Returns:
    A string containing the help text for update authorized view.
  """
  buf = io.StringIO()
  for line in UPDATE_HELP.splitlines():
    buf.write("#")
    if line:
      buf.write(" ")
    buf.write(line)
    buf.write("\n")

  serialized_original_authorized_view = SerializeToJsonOrYaml(
      current_authorized_view, pre_encoded
  )
  for line in serialized_original_authorized_view.splitlines():
    buf.write("#")
    if line:
      buf.write(" ")
    buf.write(line)
    buf.write("\n")
  buf.write("\n")
  return buf.getvalue()


def AddFieldToUpdateMask(field, req):
  """Adding a new field to the update mask of the UpdateAuthorizedViewRequest.

  Args:
    field: the field to be updated.
    req: the original UpdateAuthorizedViewRequest.

  Returns:
    req: the UpdateAuthorizedViewRequest with update mask refreshed.
  """
  update_mask = req.updateMask
  if update_mask:
    if update_mask.count(field) == 0:
      req.updateMask = update_mask + "," + field
  else:
    req.updateMask = field
  return req