HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/core/yaml_validator.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""JSON schema YAML validator module.

Usage:

  # Get a validator for the JSON schema in the file schema_path.
  validator = yaml_validator.Validator(schema_path)
  # Validate parsed YAML data.
  validator.Validate(parsed_yaml_data)
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import io
import os

from googlecloudsdk.core import exceptions
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import pkg_resources

import jsonschema


class Error(exceptions.Error):
  """Errors for this module."""


class InvalidSchemaError(Error):
  """JSON schema is invalid."""


class InvalidSchemaVersionError(Error):
  """JSON schema version is invalid."""


class RefError(Error):
  """Ref error -- YAML $ref path not found."""


class ValidationError(Error):
  """Validation error -- YAML data does not match the schema.

  Attributes:
      message: A user-readable error message describing the validation error.
  """

  def __init__(self, error):
    super(ValidationError, self).__init__(error)
    self.message = error.message


class Validator(object):
  """JSON schema validator."""

  def __init__(self, schema_path):
    """"Initilaizes the schema and validator for schema_path.

    The validator resolves references to all other schemas in the directory
    of schema_path.

    Yes, it's really this ugly defining a validator with a resolver to
    pkg_resources resources.

    Raises:
      IOError: if schema not found in installed resources.
      files.Error: if schema file not found.
      SchemaError: if the schema is invalid.

    Args:
      schema_path: JSON schema file path.

    Returns:
      The schema to validate and the validator.
    """
    schema_dir = os.path.dirname(schema_path)

    class RefResolver(jsonschema.RefResolver):
      """$ref: resolver that consults pkg_resources."""

      @staticmethod
      def resolve_remote(ref):
        """pkg_resources $ref override -- schema_dir closure needed here."""
        path = os.path.join(schema_dir, ref)
        data = pkg_resources.GetResourceFromFile(path)
        try:
          schema = yaml.load(data)
        except Exception as e:  # pylint: disable=broad-except, avoid crash
          raise InvalidSchemaError(e)
        self.ValidateSchemaVersion(schema, path)
        return schema

    try:
      schema = yaml.load(pkg_resources.GetResourceFromFile(schema_path))
    except Exception as e:  # pylint: disable=broad-except, avoid crash
      raise InvalidSchemaError(e)
    self.ValidateSchemaVersion(schema, schema_path)
    resolver = RefResolver.from_schema(schema)
    self._validator = jsonschema.validators.validator_for(schema)(
        schema, resolver=resolver)
    self._validate = self._validator.validate

  def ValidateSchemaVersion(self, schema, path):
    """Validates the parsed_yaml JSON schema version."""
    try:
      version = schema.get('$schema')
    except AttributeError:
      version = None
    if (not version or
        not version.startswith('http://json-schema.org/') or
        not version.endswith('/schema#')):
      raise InvalidSchemaVersionError(
          'Schema [{}] version [{}] is invalid. Expected "$schema: '
          'http://json-schema.org/*/schema#".'.format(path, version))

  def Validate(self, parsed_yaml):
    """Validates parsed_yaml against JSON schema.

    Args:
      parsed_yaml: YAML to validate

    Raises:
      ValidationError: if the template doesn't obey the schema.
    """
    try:
      self._validate(parsed_yaml)
    except jsonschema.RefResolutionError as e:
      raise RefError(e)
    except jsonschema.ValidationError as e:
      raise ValidationError(e)

  def ValidateWithDetailedError(self, parsed_yaml):
    """Validates parsed_yaml against JSON schema.

    Provides details of validation failure in the returned error message.
    Args:
      parsed_yaml: YAML to validate

    Raises:
      ValidationError: if the template doesn't obey the schema.
    """
    try:
      self._validate(parsed_yaml)
    except jsonschema.RefResolutionError as e:
      raise RefError(e)
    except jsonschema.exceptions.ValidationError as ve:
      msg = io.StringIO()
      msg.write('ERROR: Schema validation failed: {}\n\n'.format(ve))

      if ve.cause:
        additional_exception = 'Root Exception: {}'.format(ve.cause)
      else:
        additional_exception = ''

      root_error = ve.context[-1] if ve.context else None
      if root_error:
        error_path = ''.join(
            ('[{}]'.format(elem) for elem in root_error.absolute_path))
      else:
        error_path = ''

      msg.write('Additional Details:\n'
                'Error Message: {msg}\n\n'
                'Failing Validation Schema: {schema}\n\n'
                'Failing Element: {instance}\n\n'
                'Failing Element Path: {path}\n\n'
                '{additional_cause}\n'.format(
                    msg=root_error.message if root_error else None,
                    instance=root_error.instance if root_error else None,
                    schema=root_error.schema if root_error else None,
                    path=error_path,
                    additional_cause=additional_exception))
      ve.message = msg.getvalue()
      raise ValidationError(ve)

  def Iterate(self, parsed_yaml):
    """Validates parsed_yaml against JSON schema and returns all errors.

    Args:
      parsed_yaml: YAML to validate

    Raises:
      ValidationError: if the template doesn't obey the schema.

    Returns:
      A list of all errors, empty if no validation errors.
    """
    return self._validator.iter_errors(parsed_yaml)