File: //snap/google-cloud-cli/current/lib/surface/transfer/authorize.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command to authorize accounts for transfer."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import json
import os
from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.projects import util as projects_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.credentials import creds
from googlecloudsdk.core.credentials import store as creds_store
from googlecloudsdk.core.universe_descriptor import universe_descriptor
from googlecloudsdk.core.util import files
EXPECTED_USER_ROLES = frozenset([
    'roles/owner',
    'roles/storagetransfer.admin',
    'roles/storagetransfer.transferAgent',
    'roles/storage.objectAdmin',
    'roles/pubsub.editor',
])
EXPECTED_P4SA_ROLES = frozenset([
    'roles/storage.admin',
    'roles/storagetransfer.serviceAgent',
])
EXPECTED_GCS_SA_ROLES = frozenset(['roles/pubsub.publisher'])
SERVICE_ACCOUNT_URL_FORMAT = (
    'serviceAccount:service-{project_number}@{service_account_url_suffix}'
)
def _get_iam_prefixed_email(email_string, is_service_account):
  """Returns an email format useful for interacting with IAM APIs."""
  iam_prefix = 'serviceAccount' if is_service_account else 'user'
  return '{}:{}'.format(iam_prefix, email_string)
def _get_iam_prefiexed_gcs_sa_email(project_number):
  """Returns a GCS SA email."""
  project_prefix = (
      universe_descriptor.UniverseDescriptor()
      .Get(properties.VALUES.core.universe_domain.Get())
      .project_prefix
  )
  if project_prefix:
    service_account_url_suffix = (
        f'gs-project-accounts.{project_prefix}.iam.gserviceaccount.com'
    )
  else:
    service_account_url_suffix = 'gs-project-accounts.iam.gserviceaccount.com'
  return SERVICE_ACCOUNT_URL_FORMAT.format(
      project_number=project_number,
      service_account_url_suffix=service_account_url_suffix,
  )
def _get_existing_transfer_roles_for_account(
    project_iam_policy, prefixed_account_email, roles_set
):
  """Returns roles in IAM policy from roles_set assigned to account email."""
  roles = set()
  # iam_policy.bindings structure:
  # list[<Binding
  #       members=['serviceAccount:member@thing.iam.gserviceaccount.com', ...],
  #       role='roles/somerole'>...]
  for binding in project_iam_policy.bindings:
    if (any([m == prefixed_account_email for m in binding.members]) and
        binding.role in roles_set):
      roles.add(binding.role)
  return roles
@base.UniverseCompatible
class Authorize(base.Command):
  """Authorize an account for all Transfer Service features."""
  # pylint:disable=line-too-long
  detailed_help = {
      'DESCRIPTION':
          """\
      Authorize a Google account for all Transfer Service features.
      This command provides admin and owner rights for simplicity. If that's
      too much authority for your use case, see custom setups here:
      https://cloud.google.com/storage-transfer/docs/on-prem-set-up
      """,
      'EXAMPLES':
          """\
      To see what Transfer Service IAM roles the account logged into gcloud may
      be missing, run:
        $ {command}
      To add the missing IAM roles, run:
        $ {command} --add-missing
      To check a custom service account for missing roles, run:
        $ {command} --creds-file=path/to/service-account-key.json
      """
  }
  @staticmethod
  def Args(parser):
    parser.add_argument(
        '--creds-file',
        help='The path to the creds file for an account to authorize.'
        ' The file should be in JSON format and contain a "type" and'
        ' "client_email", which are automatically generated for most'
        ' creds files downloaded from Google (e.g. service account tokens).'
        ' If this flag is not present, the command authorizes the user'
        ' currently logged into gcloud.')
    parser.add_argument(
        '--add-missing',
        action='store_true',
        help='Add IAM roles necessary to use all Transfer Service'
        ' features to the specified account. By default, this command just'
        ' prints missing roles.')
  def Run(self, args):
    client = apis.GetClientInstance('storagetransfer', 'v1')
    messages = apis.GetMessagesModule('storagetransfer', 'v1')
    if args.creds_file:
      expanded_file_path = os.path.abspath(os.path.expanduser(args.creds_file))
      with files.FileReader(expanded_file_path) as file_reader:
        try:
          parsed_creds_file = json.load(file_reader)
          account_email = parsed_creds_file['client_email']
          is_service_account = parsed_creds_file['type'] == 'service_account'
        except (ValueError, KeyError) as e:
          log.error(e)
          raise ValueError('Invalid creds file format.'
                           ' Run command with "--help" flag for more details.')
        prefixed_account_email = _get_iam_prefixed_email(
            account_email, is_service_account)
    else:
      account_email = properties.VALUES.core.account.Get()
      is_service_account = creds.IsServiceAccountCredentials(creds_store.Load())
      prefixed_account_email = _get_iam_prefixed_email(account_email,
                                                       is_service_account)
    project_id = properties.VALUES.core.project.Get()
    parsed_project_id = projects_util.ParseProject(project_id)
    project_iam_policy = projects_api.GetIamPolicy(parsed_project_id)
    existing_user_roles = _get_existing_transfer_roles_for_account(
        project_iam_policy, prefixed_account_email, EXPECTED_USER_ROLES)
    log.status.Print('User {} has roles:\n{}'.format(account_email,
                                                     list(existing_user_roles)))
    missing_user_roles = EXPECTED_USER_ROLES - existing_user_roles
    log.status.Print('Missing roles:\n{}'.format(list(missing_user_roles)))
    all_missing_role_tuples = [
        (prefixed_account_email, role) for role in missing_user_roles
    ]
    log.status.Print('***')
    transfer_p4sa_email = client.googleServiceAccounts.Get(
        messages.StoragetransferGoogleServiceAccountsGetRequest(
            projectId=project_id)).accountEmail
    prefixed_transfer_p4sa_email = _get_iam_prefixed_email(
        transfer_p4sa_email, is_service_account=True)
    existing_p4sa_roles = _get_existing_transfer_roles_for_account(
        project_iam_policy, prefixed_transfer_p4sa_email, EXPECTED_P4SA_ROLES)
    log.status.Print('Google-managed transfer account {} has roles:\n{}'.format(
        transfer_p4sa_email, list(existing_p4sa_roles)))
    missing_p4sa_roles = EXPECTED_P4SA_ROLES - existing_p4sa_roles
    log.status.Print('Missing roles:\n{}'.format(list(missing_p4sa_roles)))
    all_missing_role_tuples += [
        (prefixed_transfer_p4sa_email, role) for role in missing_p4sa_roles
    ]
    if self.ReleaseTrack() is base.ReleaseTrack.ALPHA:
      project_number = projects_util.GetProjectNumber(project_id)
      prefixed_gcs_sa_email = _get_iam_prefiexed_gcs_sa_email(project_number)
      existing_gcs_sa_roles = _get_existing_transfer_roles_for_account(
          project_iam_policy, prefixed_gcs_sa_email, EXPECTED_GCS_SA_ROLES)
      log.status.Print('***')
      log.status.Print(
          'Google-managed service account {} has roles:\n{}'.format(
              prefixed_gcs_sa_email, list(existing_gcs_sa_roles)
          )
      )
      missing_gcs_sa_roles = EXPECTED_GCS_SA_ROLES - existing_gcs_sa_roles
      log.status.Print('Missing roles:\n{}'.format(list(missing_gcs_sa_roles)))
      all_missing_role_tuples += [
          (prefixed_gcs_sa_email, role) for role in missing_gcs_sa_roles
      ]
    if args.add_missing or all_missing_role_tuples:
      log.status.Print('***')
      if args.add_missing:
        if all_missing_role_tuples:
          log.status.Print('Adding roles:\n{}'.format(all_missing_role_tuples))
          projects_api.AddIamPolicyBindings(parsed_project_id,
                                            all_missing_role_tuples)
          log.status.Print('***')
          # Source:
          # https://cloud.google.com/iam/docs/granting-changing-revoking-access
          log.status.Print(
              'Done. Permissions typically take seconds to propagate, but,'
              ' in some cases, it can take up to seven minutes.')
        else:
          log.status.Print('No missing roles to add.')
      else:
        log.status.Print('Rerun with --add-missing to add missing roles.')