HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/assured/message_util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for constructing Assured api messages."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.assured import util
from googlecloudsdk.calliope import base as calliope_base
from googlecloudsdk.command_lib.util.apis import arg_utils

ReleaseTrack = calliope_base.ReleaseTrack


def GetMessages(release_track):
  return util.GetMessagesModule(release_track)


def GetWorkloadMessage(release_track):
  return WORKLOAD_MAP.get(release_track)


def GetComplianceRegimesEnum(release_track):
  return GetWorkloadMessage(release_track).ComplianceRegimeValueValuesEnum


def GetPartnersEnum(release_track):
  return GetWorkloadMessage(release_track).PartnerValueValuesEnum


def GetKmsSettings(release_track):
  return KMS_SETTINGS_MAP.get(release_track)


def GetResourceSettings(release_track):
  return RESOURCE_SETTINGS_MAP.get(release_track)


def GetPartnerPermissions(release_track):
  return PARTNER_PERMISSIONS_MAP.get(release_track)


def CreateAssuredParent(organization_id, location):
  return 'organizations/{}/locations/{}'.format(organization_id, location)


def CreateAssuredWorkload(
    display_name=None,
    compliance_regime=None,
    partner=None,
    partner_services_billing_account=None,
    partner_permissions=None,
    billing_account=None,
    next_rotation_time=None,
    rotation_period=None,
    labels=None,
    etag=None,
    provisioned_resources_parent=None,
    resource_settings=None,
    enable_sovereign_controls=None,
    violation_notifications_enabled=None,
    release_track=ReleaseTrack.GA,
):
  """Construct an Assured Workload message for Assured Workloads Beta API requests.

  Args:
    display_name: str, display name of the Assured Workloads environment.
    compliance_regime: str, the compliance regime, which is one of:
      FEDRAMP_MODERATE, FEDRAMP_HIGH, IL4 or CJIS.
    partner: str, the partner regime/controls.
    partner_services_billing_account: str, the billing account of the partner
      service in the form: billingAccounts/{BILLING_ACCOUNT_ID}
    partner_permissions: dict, dictionary of permission names and values for the
      partner regime.
    billing_account: str, the billing account of the Assured Workloads
      environment in the form: billingAccounts/{BILLING_ACCOUNT_ID}
    next_rotation_time: str, the next key rotation time for the Assured
      Workloads environment, for example: 2020-12-30T10:15:00.00Z
    rotation_period: str, the time between key rotations, for example: 172800s.
    labels: dict, dictionary of label keys and values of the Assured Workloads
      environment.
    etag: str, the etag of the Assured Workloads environment.
    provisioned_resources_parent: str, parent of provisioned projects, e.g.
      folders/{FOLDER_ID}.
    resource_settings: list of key=value pairs to set customized resource
      settings, which can be one of the following: consumer-project-id,
      consumer-project-name, encryption-keys-project-id,
      encryption-keys-project-name or keyring-id, for example:
      consumer-project-id={ID1},encryption-keys-project-id={ID2}
    enable_sovereign_controls: bool, whether to enable sovereign controls for
      the Assured Workloads environment.
    violation_notifications_enabled: bool, whether email notifications are
      enabled or disabled
    release_track: ReleaseTrack, gcloud release track being used

  Returns:
    A populated Assured Workloads message for the Assured Workloads Beta API.
  """

  workload_message = GetWorkloadMessage(release_track)
  workload = workload_message()
  if etag:
    workload.etag = etag
  if billing_account:
    workload.billingAccount = billing_account
  if display_name:
    workload.displayName = display_name
  if violation_notifications_enabled:
    workload.violationNotificationsEnabled = GetViolationNotificationsEnabled(
        violation_notifications_enabled
    )
  if labels:
    workload.labels = CreateLabels(labels, workload_message)
  if compliance_regime:
    workload.complianceRegime = arg_utils.ChoiceToEnum(
        compliance_regime, GetComplianceRegimesEnum(release_track)
    )
  if partner:
    workload.partner = arg_utils.ChoiceToEnum(
        partner, GetPartnersEnum(release_track)
    )
  if partner_services_billing_account:
    workload.partnerServicesBillingAccount = partner_services_billing_account
  if partner_permissions:
    workload.partnerPermissions = GetPartnerPermissions(release_track)(
        dataLogsViewer=partner_permissions['data-logs-viewer']
    )
  if provisioned_resources_parent:
    workload.provisionedResourcesParent = provisioned_resources_parent
  if next_rotation_time and rotation_period:
    workload.kmsSettings = GetKmsSettings(release_track)(
        nextRotationTime=next_rotation_time, rotationPeriod=rotation_period
    )
  if resource_settings:
    workload.resourceSettings = CreateResourceSettingsList(
        resource_settings, release_track
    )
  if enable_sovereign_controls:
    workload.enableSovereignControls = enable_sovereign_controls
  return workload


def CreateAssuredWorkloadsParent(organization_id, location, workload_id):
  return 'organizations/{}/locations/{}/workloads/{}'.format(
      organization_id, location, workload_id
  )


def GetViolationNotificationsEnabled(violation_notifications_enabled):
  if violation_notifications_enabled.lower() == 'true':
    return True
  if violation_notifications_enabled.lower() == 'false':
    return False
  else:
    return violation_notifications_enabled


def CreateLabels(labels, workload_message):
  workload_labels = []
  for key, value in labels.items():
    new_label = workload_message.LabelsValue.AdditionalProperty(
        key=key, value=value
    )
    workload_labels.append(new_label)
  return workload_message.LabelsValue(additionalProperties=workload_labels)


def CreateResourceSettingsList(resource_settings, release_track):
  """Construct a list of ResourceSettings for Assured Workload object.

  Args:
    resource_settings: a list of key=value pairs of customized resource
      settings.
    release_track: ReleaseTrack, gcloud release track being used.

  Returns:
    A list of ResourceSettings for the Assured Workload object.
  """
  resource_settings_dict = {}
  for key, value in resource_settings.items():
    resource_type = GetResourceType(key, release_track)
    resource_settings = (
        resource_settings_dict[resource_type]
        if resource_type in resource_settings_dict
        else CreateResourceSettings(resource_type, release_track)
    )
    if key.endswith('-id'):
      resource_settings.resourceId = value
    elif key.endswith('-name'):
      resource_settings.displayName = value
    resource_settings_dict[resource_type] = resource_settings
  return list(resource_settings_dict.values())


def GetResourceType(key, release_track):
  """Returns a resource settings type from the key.

  Args:
    key: str, the setting name, which can be one of the following -
      consumer-project-id, consumer-project-name, encryption-keys-project-id,
      encryption-keys-project-name or keyring-id.
    release_track: ReleaseTrack, gcloud release track being used.
  """
  resource_settings_message = GetResourceSettings(release_track)
  if key.startswith('consumer-project'):
    return (
        resource_settings_message.ResourceTypeValueValuesEnum.CONSUMER_PROJECT
    )
  elif key.startswith('encryption-keys-project'):
    return (
        resource_settings_message.ResourceTypeValueValuesEnum.ENCRYPTION_KEYS_PROJECT
    )
  elif key.startswith('keyring'):
    return resource_settings_message.ResourceTypeValueValuesEnum.KEYRING


def CreateResourceSettings(resource_type, release_track):
  resource_settings_message = GetResourceSettings(release_track)
  return resource_settings_message(resourceType=resource_type)


def CreateUpdateMask(display_name, labels, violation_notifications_enabled):
  update_mask = []
  if display_name:
    update_mask.append('workload.display_name')
  if labels:
    update_mask.append('workload.labels')
  if violation_notifications_enabled:
    update_mask.append('workload.violation_notifications_enabled')
  return ','.join(update_mask)


def CreateCreateRequest(
    external_id, parent, workload, release_track=ReleaseTrack.GA
):
  """Construct an Assured Workload Create Request for Assured Workloads API requests.

  Args:
    external_id: str, the identifier that identifies this Assured Workloads
      environment externally.
    parent: str, the parent organization of the Assured Workloads environment to
      be created, in the form: organizations/{ORG_ID}/locations/{LOCATION}.
    workload: Workload, new Assured Workloads environment containing the values
      to be used.
    release_track: ReleaseTrack, gcloud release track being used

  Returns:
    A populated Assured Workloads Update Request for the Assured Workloads API.
  """
  if release_track == ReleaseTrack.GA:
    return util.GetMessagesModule(
        release_track
    ).AssuredworkloadsOrganizationsLocationsWorkloadsCreateRequest(
        externalId=external_id,
        parent=parent,
        googleCloudAssuredworkloadsV1Workload=workload,
    )
  else:
    return util.GetMessagesModule(
        release_track
    ).AssuredworkloadsOrganizationsLocationsWorkloadsCreateRequest(
        externalId=external_id,
        parent=parent,
        googleCloudAssuredworkloadsV1beta1Workload=workload,
    )


def CreateUpdateRequest(
    workload, name, update_mask, release_track=ReleaseTrack.GA
):
  """Construct an Assured Workload Update Request for Assured Workloads API requests.

  Args:
    workload: googleCloudAssuredworkloadsV1beta1Workload, new Assured Workloads
      environment containing the new configuration values to be used.
    name: str, the name for the Assured Workloads environment being updated in
      the form:
      organizations/{ORG_ID}/locations/{LOCATION}/workloads/{WORKLOAD_ID}.
    update_mask: str, list of the fields to be updated, for example,
      workload.display_name,workload.labels
    release_track: ReleaseTrack, gcloud release track being used

  Returns:
    A populated Assured Workloads Update Request for the Assured Workloads API.
  """
  messages = util.GetMessagesModule(release_track)
  if release_track == ReleaseTrack.GA:
    return messages.AssuredworkloadsOrganizationsLocationsWorkloadsPatchRequest(
        googleCloudAssuredworkloadsV1Workload=workload,
        name=name,
        updateMask=update_mask,
    )
  else:
    return messages.AssuredworkloadsOrganizationsLocationsWorkloadsPatchRequest(
        googleCloudAssuredworkloadsV1beta1Workload=workload,
        name=name,
        updateMask=update_mask,
    )


def CreateAcknowledgeRequest(
    name, comment, acknowledge_type=None, release_track=ReleaseTrack.GA
):
  """Construct an Assured Workload Violation Acknowledgement Request.

  Args:
    name: str, the name for the Assured Workloads violation being described in
      the form:
      organizations/{ORG_ID}/locations/{LOCATION}/workloads/{WORKLOAD_ID}/violations/{VIOLATION_ID}.
    comment: str, the business justification which the user wants to add while
      acknowledging a violation.
    acknowledge_type: str, the acknowledge type for specified violation, which
      is one of: SINGLE_VIOLATION - to acknowledge specified violation,
      EXISTING_CHILD_RESOURCE_VIOLATIONS - to acknowledge specified org policy
      violation and all associated child resource violations.
    release_track: ReleaseTrack, gcloud release track being used

  Returns:
    A populated Assured Workloads Violation Acknowledgement Request.
  """
  messages = util.GetMessagesModule(release_track)
  if acknowledge_type:
    acknowledge_type = messages.GoogleCloudAssuredworkloadsV1beta1AcknowledgeViolationRequest.AcknowledgeTypeValueValuesEnum(
        acknowledge_type
    )
  if release_track == ReleaseTrack.GA:
    return messages.AssuredworkloadsOrganizationsLocationsWorkloadsViolationsAcknowledgeRequest(
        googleCloudAssuredworkloadsV1AcknowledgeViolationRequest=messages.GoogleCloudAssuredworkloadsV1AcknowledgeViolationRequest(
            comment=comment
        ),
        name=name,
    )
  else:
    return messages.AssuredworkloadsOrganizationsLocationsWorkloadsViolationsAcknowledgeRequest(
        googleCloudAssuredworkloadsV1beta1AcknowledgeViolationRequest=messages.GoogleCloudAssuredworkloadsV1beta1AcknowledgeViolationRequest(
            comment=comment, acknowledgeType=acknowledge_type
        ),
        name=name,
    )


WORKLOAD_MAP = {
    ReleaseTrack.ALPHA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1Workload,
    ReleaseTrack.BETA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1Workload,
    ReleaseTrack.GA: GetMessages(
        ReleaseTrack.GA
    ).GoogleCloudAssuredworkloadsV1Workload,
}

KMS_SETTINGS_MAP = {
    ReleaseTrack.ALPHA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadKMSSettings,
    ReleaseTrack.BETA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadKMSSettings,
    ReleaseTrack.GA: GetMessages(
        ReleaseTrack.GA
    ).GoogleCloudAssuredworkloadsV1WorkloadKMSSettings,
}

RESOURCE_SETTINGS_MAP = {
    ReleaseTrack.ALPHA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadResourceSettings,
    ReleaseTrack.BETA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadResourceSettings,
    ReleaseTrack.GA: GetMessages(
        ReleaseTrack.GA
    ).GoogleCloudAssuredworkloadsV1WorkloadResourceSettings,
}

PARTNER_PERMISSIONS_MAP = {
    ReleaseTrack.ALPHA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadPartnerPermissions,
    ReleaseTrack.BETA: GetMessages(
        ReleaseTrack.BETA
    ).GoogleCloudAssuredworkloadsV1beta1WorkloadPartnerPermissions,
    ReleaseTrack.GA: GetMessages(
        ReleaseTrack.GA
    ).GoogleCloudAssuredworkloadsV1WorkloadPartnerPermissions,
}