HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/compute/interconnects/groups/client.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Interconnect Group."""


from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals


class InterconnectGroup(object):
  """Abstracts Interconnect Group resource."""

  def __init__(self, ref, project, compute_client=None, resources=None):
    self.ref = ref
    self.project = project
    self._compute_client = compute_client
    self._resources = resources

  @property
  def _client(self):
    return self._compute_client.apitools_client

  @property
  def _messages(self):
    return self._compute_client.messages

  def MakeInterconnectGroupsCreateMembersInterconnectInput(
      self,
      facility: str = None,
      description: str = None,
      name: str = None,
      link_type: str = None,
      requested_link_count: int = 1,
      interconnect_type: str = None,
      admin_enabled: bool = True,
      noc_contact_email: str = None,
      customer_name: str = None,
      remote_location: str = None,
      requested_features: str = None,
  ):
    """Make an InterconnectGroupsCreateMembersInterconnectInput."""
    return self._messages.InterconnectGroupsCreateMembersInterconnectInput(
        facility=facility,
        description=description,
        name=name,
        linkType=link_type,
        requestedLinkCount=requested_link_count,
        interconnectType=interconnect_type,
        adminEnabled=admin_enabled,
        nocContactEmail=noc_contact_email,
        customerName=customer_name,
        remoteLocation=remote_location,
        requestedFeatures=requested_features,
    )

  def _MakeAdditionalProperties(self, interconnects):
    return [
        self._messages.InterconnectGroup.InterconnectsValue.AdditionalProperty(
            # The keys are arbitrary strings that only need to
            # be unique, and we use the Interconnect name.
            key=interconnect,
            value=self._messages.InterconnectGroupInterconnect(
                interconnect=self._resources.Create(
                    'compute.interconnects',
                    interconnect=interconnect,
                    project=self.ref.project,
                ).SelfLink()
            ),
        )
        for interconnect in interconnects
    ]

  def _MakeCreateRequestTuple(
      self,
      description,
      topology_capability,
      interconnects,
  ):
    """Make a tuple for interconnect group insert request.

    Args:
      description: String that represents the description of the Cloud
        Interconnect Group resource.
      topology_capability: String that represents the topology capability of the
        Cloud Interconnect Group resource.
      interconnects: List of strings that represent the names of the Cloud
        Interconnect resources that are members of the Cloud Interconnect Group
        resource.

    Returns:
    Insert interconnect group tuple that can be used in a request.
    """
    messages = self._messages
    return (
        self._client.interconnectGroups,
        'Insert',
        messages.ComputeInterconnectGroupsInsertRequest(
            project=self.project,
            interconnectGroup=messages.InterconnectGroup(
                intent=messages.InterconnectGroupIntent(
                    topologyCapability=topology_capability
                ),
                name=self.ref.Name(),
                description=description,
                interconnects=messages.InterconnectGroup.InterconnectsValue(
                    additionalProperties=self._MakeAdditionalProperties(
                        interconnects
                    )
                ),
            ),
        ),
    )

  def _MakePatchRequestTuple(
      self, topology_capability, interconnects, update_mask, **kwargs
  ):
    """Make a tuple for interconnect group patch request."""
    messages = self._messages
    group_params = {
        'interconnects': messages.InterconnectGroup.InterconnectsValue(
            additionalProperties=self._MakeAdditionalProperties(interconnects)
        ),
    }
    group_params.update(kwargs)
    if topology_capability is not None:
      group_params['intent'] = messages.InterconnectGroupIntent(
          topologyCapability=topology_capability
      )
    return (
        self._client.interconnectGroups,
        'Patch',
        messages.ComputeInterconnectGroupsPatchRequest(
            project=self.project,
            interconnectGroup=self.ref.Name(),
            interconnectGroupResource=messages.InterconnectGroup(
                **group_params
            ),
            updateMask=update_mask,
        ),
    )

  def _MakeDeleteRequestTuple(self):
    return (
        self._client.interconnectGroups,
        'Delete',
        self._messages.ComputeInterconnectGroupsDeleteRequest(
            project=self.ref.project, interconnectGroup=self.ref.Name()
        ),
    )

  def _MakeDescribeRequestTuple(self):
    return (
        self._client.interconnectGroups,
        'Get',
        self._messages.ComputeInterconnectGroupsGetRequest(
            project=self.ref.project, interconnectGroup=self.ref.Name()
        ),
    )

  def _MakeGetOperationalStatusRequestTuple(self):
    return (
        self._client.interconnectGroups,
        'GetOperationalStatus',
        self._messages.ComputeInterconnectGroupsGetOperationalStatusRequest(
            project=self.ref.project, interconnectGroup=self.ref.Name()
        ),
    )

  def _MakeCreateMembersRequestTuple(
      self,
      intent_mismatch_behavior,
      template_interconnect,
      member_interconnects,
  ):
    messages = self._messages
    return (
        self._client.interconnectGroups,
        'CreateMembers',
        messages.ComputeInterconnectGroupsCreateMembersRequest(
            project=self.ref.project,
            interconnectGroupsCreateMembersRequest=messages.InterconnectGroupsCreateMembersRequest(
                request=messages.InterconnectGroupsCreateMembers(
                    intentMismatchBehavior=intent_mismatch_behavior,
                    templateInterconnect=template_interconnect,
                    interconnects=member_interconnects,
                ),
            ),
            interconnectGroup=self.ref.Name(),
        ),
    )

  def Create(
      self,
      description=None,
      topology_capability=None,
      interconnects=(),
      only_generate_request=False,
  ):
    """Create an interconnect group."""
    requests = [
        self._MakeCreateRequestTuple(
            description,
            topology_capability,
            interconnects,
        )
    ]
    if not only_generate_request:
      resources = self._compute_client.MakeRequests(requests)
      return resources[0]
    return requests

  def Delete(self, only_generate_request=False):
    requests = [self._MakeDeleteRequestTuple()]
    if not only_generate_request:
      return self._compute_client.MakeRequests(requests)
    return requests

  def Describe(self, only_generate_request=False):
    requests = [self._MakeDescribeRequestTuple()]
    if not only_generate_request:
      resources = self._compute_client.MakeRequests(requests)
      return resources[0]
    return requests

  def Patch(
      self,
      topology_capability=None,
      interconnects=(),
      update_mask='',
      only_generate_request=False,
      **kwargs
  ):
    """Patch description, topology capability and member interconnects of an interconnect group."""
    requests = [
        self._MakePatchRequestTuple(
            topology_capability, interconnects, update_mask, **kwargs
        )
    ]
    if not only_generate_request:
      resources = self._compute_client.MakeRequests(requests)
      return resources[0]
    return requests

  def GetOperationalStatus(self, only_generate_request=False):
    requests = [self._MakeGetOperationalStatusRequestTuple()]
    if not only_generate_request:
      resources = self._compute_client.MakeRequests(requests)
      return resources[0]
    return requests

  def CreateMembers(
      self,
      intent_mismatch_behavior=None,
      template_interconnect=None,
      member_interconnects=(),
  ):
    """Create member interconnects in an interconnect group."""
    requests = [
        self._MakeCreateMembersRequestTuple(
            intent_mismatch_behavior,
            template_interconnect,
            member_interconnects,
        )
    ]
    resources = self._compute_client.MakeRequests(requests)
    return resources[0]