HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/compute/security_policies/client.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Security policy."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.calliope import exceptions as calliope_exceptions


class SecurityPolicy(object):
  """Abstracts SecurityPolicy resource."""

  def __init__(self, ref, compute_client=None):
    self.ref = ref
    self._compute_client = compute_client

  @property
  def _client(self):
    return self._compute_client.apitools_client

  @property
  def _messages(self):
    return self._compute_client.messages

  def _MakeDeleteRequestTuple(self):
    region = getattr(self.ref, 'region', None)
    if region is not None:
      return (self._client.regionSecurityPolicies, 'Delete',
              self._messages.ComputeRegionSecurityPoliciesDeleteRequest(
                  project=self.ref.project,
                  region=region,
                  securityPolicy=self.ref.Name()))
    return (self._client.securityPolicies, 'Delete',
            self._messages.ComputeSecurityPoliciesDeleteRequest(
                project=self.ref.project, securityPolicy=self.ref.Name()))

  def _MakeDescribeRequestTuple(self):
    region = getattr(self.ref, 'region', None)
    if region is not None:
      return (self._client.regionSecurityPolicies, 'Get',
              self._messages.ComputeRegionSecurityPoliciesGetRequest(
                  project=self.ref.project,
                  region=region,
                  securityPolicy=self.ref.Name()))
    return (self._client.securityPolicies, 'Get',
            self._messages.ComputeSecurityPoliciesGetRequest(
                project=self.ref.project, securityPolicy=self.ref.Name()))

  def _MakeCreateRequestTuple(self, security_policy):
    region = getattr(self.ref, 'region', None)
    if region is not None:
      return (self._client.regionSecurityPolicies, 'Insert',
              self._messages.ComputeRegionSecurityPoliciesInsertRequest(
                  project=self.ref.project,
                  region=region,
                  securityPolicy=security_policy))
    return (self._client.securityPolicies, 'Insert',
            self._messages.ComputeSecurityPoliciesInsertRequest(
                project=self.ref.project, securityPolicy=security_policy))

  def _MakePatchRequestTuple(self, security_policy, update_mask=None):
    """Generates a SecurityPolicies Patch request.

    Args:
      security_policy: The updated security policy
      update_mask: Field mask for clearing fields

    Returns:
      A tuple containing the resource collection, verb, and request.
    """

    region = getattr(self.ref, 'region', None)
    if region is not None:
      if update_mask:
        return (
            self._client.regionSecurityPolicies,
            'Patch',
            self._messages.ComputeRegionSecurityPoliciesPatchRequest(
                project=self.ref.project,
                region=region,
                securityPolicy=self.ref.Name(),
                securityPolicyResource=security_policy,
                updateMask=update_mask,
            ),
        )
      return (self._client.regionSecurityPolicies, 'Patch',
              self._messages.ComputeRegionSecurityPoliciesPatchRequest(
                  project=self.ref.project,
                  region=region,
                  securityPolicy=self.ref.Name(),
                  securityPolicyResource=security_policy))
    return (self._client.securityPolicies, 'Patch',
            self._messages.ComputeSecurityPoliciesPatchRequest(
                project=self.ref.project,
                securityPolicy=self.ref.Name(),
                securityPolicyResource=security_policy))

  def Delete(self, only_generate_request=False):
    requests = [self._MakeDeleteRequestTuple()]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Describe(self, only_generate_request=False):
    requests = [self._MakeDescribeRequestTuple()]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Create(self, security_policy=None, only_generate_request=False):
    requests = [self._MakeCreateRequestTuple(security_policy)]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Patch(
      self, security_policy=None, field_mask=None, only_generate_request=False
  ):
    requests = [self._MakePatchRequestTuple(security_policy, field_mask)]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests


class SecurityPolicyRule(object):
  """Abstracts SecurityPolicyRule resource."""

  def __init__(self, ref, compute_client=None):
    self.ref = ref
    self._compute_client = compute_client

  @property
  def _client(self):
    return self._compute_client.apitools_client

  @property
  def _messages(self):
    return self._compute_client.messages

  def _ConvertPriorityToInt(self, priority):
    try:
      int_priority = int(priority)
    except ValueError:
      raise calliope_exceptions.InvalidArgumentException(
          'priority', 'priority must be a valid non-negative integer.')
    if int_priority < 0:
      raise calliope_exceptions.InvalidArgumentException(
          'priority', 'priority must be a valid non-negative integer.')
    return int_priority

  def _ConvertAction(self, action):
    return {
        'deny-403': 'deny(403)',
        'deny-404': 'deny(404)',
        'deny-502': 'deny(502)',
        'redirect-to-recaptcha': 'redirect_to_recaptcha',
        'rate-based-ban': 'rate_based_ban'
    }.get(action, action)

  def _MakeDeleteRequestTuple(self):
    if getattr(self.ref, 'region', None) is not None:
      return (self._client.regionSecurityPolicies, 'RemoveRule',
              self._messages.ComputeRegionSecurityPoliciesRemoveRuleRequest(
                  project=self.ref.project,
                  priority=self._ConvertPriorityToInt(self.ref.Name()),
                  region=self.ref.region,
                  securityPolicy=self.ref.securityPolicy))
    return (self._client.securityPolicies, 'RemoveRule',
            self._messages.ComputeSecurityPoliciesRemoveRuleRequest(
                project=self.ref.project,
                priority=self._ConvertPriorityToInt(self.ref.Name()),
                securityPolicy=self.ref.securityPolicy))

  def _MakeDescribeRequestTuple(self):
    if getattr(self.ref, 'region', None) is not None:
      return (self._client.regionSecurityPolicies, 'GetRule',
              self._messages.ComputeRegionSecurityPoliciesGetRuleRequest(
                  project=self.ref.project,
                  priority=self._ConvertPriorityToInt(self.ref.Name()),
                  region=self.ref.region,
                  securityPolicy=self.ref.securityPolicy))
    return (self._client.securityPolicies, 'GetRule',
            self._messages.ComputeSecurityPoliciesGetRuleRequest(
                project=self.ref.project,
                priority=self._ConvertPriorityToInt(self.ref.Name()),
                securityPolicy=self.ref.securityPolicy))

  def _MakeCreateRequestTuple(
      self,
      src_ip_ranges,
      expression,
      expression_options,
      network_matcher,
      action,
      description,
      preview,
      redirect_options,
      rate_limit_options,
      request_headers_to_add,
  ):
    """Generates a SecurityPolicies AddRule request.

    Args:
      src_ip_ranges: The list of IP ranges to match.
      expression: The CEVAL expression to match.
      expression_options: The configuration options when specifying a CEVAL
        expression.
      network_matcher: Net LB fields to match.
      action: The action to enforce on match.
      description: The description of the rule.
      preview: If true, the action will not be enforced.
      redirect_options: Parameters defining the redirect action, such as
        redirect type and redirect target.
      rate_limit_options: The rate limiting behavior for this rule.
      request_headers_to_add: A list of headers to add to requests that match
        this rule.

    Returns:
      A tuple containing the resource collection, verb, and request.
    """
    if network_matcher:
      security_policy_rule = self._messages.SecurityPolicyRule(
          priority=self._ConvertPriorityToInt(self.ref.Name()),
          description=description,
          action=self._ConvertAction(action),
          networkMatch=network_matcher,
          preview=preview,
      )
    else:
      if src_ip_ranges:
        matcher = self._messages.SecurityPolicyRuleMatcher(
            versionedExpr=self._messages.SecurityPolicyRuleMatcher.VersionedExprValueValuesEnum(
                'SRC_IPS_V1'
            ),
            config=self._messages.SecurityPolicyRuleMatcherConfig(
                srcIpRanges=src_ip_ranges
            ),
        )
      else:
        assert expression is not None
        matcher = self._messages.SecurityPolicyRuleMatcher(
            expr=self._messages.Expr(expression=expression)
        )
      if expression_options:
        matcher.exprOptions = expression_options
      security_policy_rule = self._messages.SecurityPolicyRule(
          priority=self._ConvertPriorityToInt(self.ref.Name()),
          description=description,
          action=self._ConvertAction(action),
          match=matcher,
          preview=preview,
      )

    if redirect_options is not None:
      security_policy_rule.redirectOptions = redirect_options
    if request_headers_to_add is not None:
      security_policy_rule.headerAction = self._ConvertRequestHeadersToAdd(
          request_headers_to_add)

    if rate_limit_options is not None:
      security_policy_rule.rateLimitOptions = rate_limit_options

    if getattr(self.ref, 'region', None) is not None:
      return (self._client.regionSecurityPolicies, 'AddRule',
              self._messages.ComputeRegionSecurityPoliciesAddRuleRequest(
                  project=self.ref.project,
                  securityPolicyRule=security_policy_rule,
                  region=self.ref.region,
                  securityPolicy=self.ref.securityPolicy))
    return (self._client.securityPolicies, 'AddRule',
            self._messages.ComputeSecurityPoliciesAddRuleRequest(
                project=self.ref.project,
                securityPolicyRule=security_policy_rule,
                securityPolicy=self.ref.securityPolicy))

  def _MakePatchRequestTuple(
      self,
      src_ip_ranges,
      expression,
      expression_options,
      network_matcher,
      action,
      description,
      preview,
      redirect_options,
      rate_limit_options,
      request_headers_to_add,
      preconfig_waf_config,
      update_mask,
  ):
    """Generates a SecurityPolicies PatchRule request.

    Args:
      src_ip_ranges: The list of IP ranges to match.
      expression: The CEVAL expression to match.
      expression_options: The configuration options when specifying a CEVAL
        expression.
      network_matcher: Net LB fields to match.
      action: The action to enforce on match.
      description: The description of the rule.
      preview: If true, the action will not be enforced.
      redirect_options: Parameters defining the redirect action, such as
        redirect type and redirect target.
      rate_limit_options: The rate limiting behavior for this rule.
      request_headers_to_add: A list of headers to add to requests that match
        this rule.
      preconfig_waf_config: preconfigured WAF configuration to be applied for
        this rule.
      update_mask: Field mask for clearing fields

    Returns:
      A tuple containing the resource collection, verb, and request.
    """
    if network_matcher:
      security_policy_rule = self._messages.SecurityPolicyRule(
          priority=self._ConvertPriorityToInt(self.ref.Name()),
          description=description,
          action=self._ConvertAction(action),
          networkMatch=network_matcher,
          preview=preview,
      )
    else:
      matcher = None
      if src_ip_ranges:
        matcher = self._messages.SecurityPolicyRuleMatcher(
            versionedExpr=self._messages.SecurityPolicyRuleMatcher.VersionedExprValueValuesEnum(
                'SRC_IPS_V1'
            ),
            config=self._messages.SecurityPolicyRuleMatcherConfig(
                srcIpRanges=src_ip_ranges
            ),
        )
      elif expression:
        matcher = self._messages.SecurityPolicyRuleMatcher(
            expr=self._messages.Expr(expression=expression)
        )
      if expression_options:
        if matcher is None:
          matcher = self._messages.SecurityPolicyRuleMatcher()
        matcher.exprOptions = expression_options
      security_policy_rule = self._messages.SecurityPolicyRule(
          priority=self._ConvertPriorityToInt(self.ref.Name()),
          description=description,
          action=self._ConvertAction(action),
          match=matcher,
          preview=preview,
      )

    if redirect_options is not None:
      security_policy_rule.redirectOptions = redirect_options
    if request_headers_to_add is not None:
      security_policy_rule.headerAction = self._ConvertRequestHeadersToAdd(
          request_headers_to_add)

    if rate_limit_options is not None:
      security_policy_rule.rateLimitOptions = rate_limit_options
    if preconfig_waf_config is not None:
      security_policy_rule.preconfiguredWafConfig = preconfig_waf_config

    if getattr(self.ref, 'region', None) is not None:
      return (
          self._client.regionSecurityPolicies,
          'PatchRule',
          self._messages.ComputeRegionSecurityPoliciesPatchRuleRequest(
              project=self.ref.project,
              priority=self._ConvertPriorityToInt(self.ref.Name()),
              securityPolicyRule=security_policy_rule,
              region=self.ref.region,
              securityPolicy=self.ref.securityPolicy,
              updateMask=update_mask,
          ),
      )
    return (self._client.securityPolicies, 'PatchRule',
            self._messages.ComputeSecurityPoliciesPatchRuleRequest(
                project=self.ref.project,
                priority=self._ConvertPriorityToInt(self.ref.Name()),
                securityPolicyRule=security_policy_rule,
                securityPolicy=self.ref.securityPolicy,
                updateMask=update_mask))

  def _ConvertRequestHeadersToAdd(self, request_headers_to_add):
    """Converts a request-headers-to-add string list into an HttpHeaderAction.

    Args:
      request_headers_to_add: A dict of headers to add to requests that match
        this rule. Leading whitespace in each header name and value is stripped.

    Returns:
      An HttpHeaderAction object with a populated request_headers_to_add field.
    """
    header_action = self._messages.SecurityPolicyRuleHttpHeaderAction()
    for hdr_name, hdr_val in request_headers_to_add.items():
      header_to_add = (
          self._messages.SecurityPolicyRuleHttpHeaderActionHttpHeaderOption())
      header_to_add.headerName = hdr_name.strip()
      header_to_add.headerValue = hdr_val.lstrip()
      header_action.requestHeadersToAdds.append(header_to_add)
    return header_action

  def Delete(self, only_generate_request=False):
    requests = [self._MakeDeleteRequestTuple()]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Describe(self, only_generate_request=False):
    requests = [self._MakeDescribeRequestTuple()]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Create(
      self,
      src_ip_ranges=None,
      expression=None,
      expression_options=None,
      network_matcher=None,
      action=None,
      description=None,
      preview=False,
      redirect_options=None,
      rate_limit_options=None,
      request_headers_to_add=None,
      only_generate_request=False,
  ):
    """Make and optionally send a request to Create a security policy rule."""
    requests = [
        self._MakeCreateRequestTuple(
            src_ip_ranges,
            expression,
            expression_options,
            network_matcher,
            action,
            description,
            preview,
            redirect_options,
            rate_limit_options,
            request_headers_to_add,
        )
    ]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Patch(
      self,
      src_ip_ranges=None,
      expression=None,
      expression_options=None,
      network_matcher=None,
      action=None,
      description=None,
      preview=None,
      redirect_options=None,
      rate_limit_options=None,
      request_headers_to_add=None,
      preconfig_waf_config=None,
      update_mask=None,
      only_generate_request=False,
  ):
    """Make and optionally send a request to Patch a security policy rule."""
    requests = [
        self._MakePatchRequestTuple(
            src_ip_ranges,
            expression,
            expression_options,
            network_matcher,
            action,
            description,
            preview,
            redirect_options,
            rate_limit_options,
            request_headers_to_add,
            preconfig_waf_config,
            update_mask,
        )
    ]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests