File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/artifacts/util.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility for forming Artifact Registry requests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import collections
from concurrent import futures
import copy
# TODO(b/142489773) Required because of thread-safety issue with loading python
# modules in the presence of threads.
import encodings.idna # pylint: disable=unused-import
import json
import mimetypes
import multiprocessing
import os
import random
import re
import sys
import time
from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
from containerregistry.client import docker_name
from containerregistry.client.v2_2 import docker_http
from containerregistry.client.v2_2 import docker_image
from googlecloudsdk.api_lib import artifacts
from googlecloudsdk.api_lib.artifacts import exceptions as ar_exceptions
from googlecloudsdk.api_lib.artifacts import filter_rewriter
from googlecloudsdk.api_lib.cloudresourcemanager import projects_api
from googlecloudsdk.api_lib.container.images import util
from googlecloudsdk.api_lib.util import common_args
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.artifacts import docker_util
from googlecloudsdk.command_lib.artifacts import remote_repo_util
from googlecloudsdk.command_lib.artifacts import requests as ar_requests
from googlecloudsdk.command_lib.artifacts import upgrade_util
from googlecloudsdk.command_lib.projects import util as project_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.resource import resource_printer
from googlecloudsdk.core.universe_descriptor import universe_descriptor
from googlecloudsdk.core.util import edit
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import parallel
from googlecloudsdk.core.util import platforms
import requests
_INVALID_REPO_NAME_ERROR = (
"Names may only contain lowercase letters, numbers, and hyphens, and must "
"begin with a letter and end with a letter or number.")
_INVALID_RULE_NAME_ERROR = (
"Names may only contain lowercase letters, numbers, hyphens, underscores "
"and dots, and must begin with a letter and end with a letter or number.")
_INVALID_REPO_LOCATION_ERROR = ("GCR repository {} can only be created in the "
"{} multi-region.")
_INVALID_GCR_REPO_FORMAT_ERROR = "GCR repository {} must be of DOCKER format."
_ALLOWED_GCR_REPO_LOCATION = {
"gcr.io": "us",
"us.gcr.io": "us",
"eu.gcr.io": "europe",
"asia.gcr.io": "asia",
}
_REPO_REGEX = "^[a-z]([a-z0-9-]*[a-z0-9])?$"
# https://google.aip.dev/122
_RESOURCE_ID_REGEX = "^[a-z]([a-z0-9._-]*[a-z0-9])?$"
_AR_SERVICE_ACCOUNT = "service-{project_num}@gcp-sa-artifactregistry.{project_prefix}iam.gserviceaccount.com"
_GCR_BUCKETS = {
"us": {
"bucket": "us.artifacts.{}.appspot.com",
"repository": "us.gcr.io",
"location": "us"
},
"europe": {
"bucket": "eu.artifacts.{}.appspot.com",
"repository": "eu.gcr.io",
"location": "europe"
},
"asia": {
"bucket": "asia.artifacts.{}.appspot.com",
"repository": "asia.gcr.io",
"location": "asia"
},
"global": {
"bucket": "artifacts.{}.appspot.com",
"repository": "gcr.io",
"location": "us"
}
}
_REPO_CREATION_HELP_TEXT = """\
Format of the repository. REPOSITORY_FORMAT must be one of:\n
apt
APT package format.
docker
Docker image format.
go
Go module format.
kfp
KFP package format.
maven
Maven package format.
npm
NPM package format.
python
Python package format.
yum
YUM package format.
"""
_REPO_CREATION_HELP_TEXT_BETA = """\
Format of the repository. REPOSITORY_FORMAT must be one of:\n
apt
APT package format.
docker
Docker image format.
googet
GooGet package format.
kfp
KFP package format.
maven
Maven package format.
npm
NPM package format.
python
Python package format.
yum
YUM package format.
"""
_REPO_CREATION_HELP_UPSTREAM_POLICIES = """\
(Virtual Repositories only) is the upstreams for the Virtual Repository.
Example of the file contents:
[
{
"id": "test1",
"repository": "projects/p1/locations/us-central1/repositories/repo1",
"priority": 1
},
{
"id": "test2",
"repository": "projects/p2/locations/us-west2/repositories/repo2",
"priority": 2
}
]
"""
_INVALID_UPSTREAM_POLICY = ("Upstream Policies should contain id, repository "
"and priority.")
def _GetMessagesForResource(resource_ref):
return artifacts.Messages(resource_ref.GetCollectionInfo().api_version)
def _GetClientForResource(resource_ref):
return artifacts.Client(resource_ref.GetCollectionInfo().api_version)
def _IsValidRepoName(repo_name):
return re.match(_REPO_REGEX, repo_name) is not None
def _IsValidRuleName(rule_name):
return re.match(_RESOURCE_ID_REGEX, rule_name) is not None
def GetProject(args):
"""Gets project resource from either argument flag or attribute."""
return args.project or properties.VALUES.core.project.GetOrFail()
def GetParent(project, location):
parent = "{}".format(project)
if location is not None:
parent = f"{project}/locations/{location}"
return parent
def GetRepo(args):
"""Gets repository resource from either argument flag or attribute."""
return args.repository or properties.VALUES.artifacts.repository.GetOrFail()
def GetLocation(args):
"""Gets location resource from either argument flag or attribute."""
return args.location or properties.VALUES.artifacts.location.GetOrFail()
def GetLocationList(args):
return ar_requests.ListLocations(GetProject(args), args.page_size)
def ValidateGcrRepo(repo_name, repo_format, location, docker_format):
"""Validates input for a gcr.io repository."""
expected_location = _ALLOWED_GCR_REPO_LOCATION.get(repo_name, "")
if location != expected_location:
raise ar_exceptions.InvalidInputValueError(
_INVALID_REPO_LOCATION_ERROR.format(repo_name, expected_location))
if repo_format != docker_format:
raise ar_exceptions.InvalidInputValueError(
_INVALID_GCR_REPO_FORMAT_ERROR.format(repo_name))
def AppendRepoDataToRequest(repo_ref, repo_args, request):
"""Adds repository data to CreateRepositoryRequest or UpdateRepositoryRequest."""
repo_name = repo_ref.repositoriesId
location = GetLocation(repo_args)
messages = _GetMessagesForResource(repo_ref)
docker_format = messages.Repository.FormatValueValuesEnum.DOCKER
repo_format = messages.Repository.FormatValueValuesEnum(
repo_args.repository_format.upper())
if repo_name in _ALLOWED_GCR_REPO_LOCATION:
ValidateGcrRepo(repo_name, repo_format, location, docker_format)
elif not _IsValidRepoName(repo_ref.repositoriesId):
raise ar_exceptions.InvalidInputValueError(_INVALID_REPO_NAME_ERROR)
if remote_repo_util.IsRemoteRepoRequest(repo_args):
request = remote_repo_util.AppendRemoteRepoConfigToRequest(
messages, repo_args, request
)
if hasattr(repo_args, "alternative_hostname"): # only v1 has this
if repo_args.alternative_hostname:
request.repository.networkConfig.alternativeHostname = (
repo_args.alternative_hostname
)
if repo_args.alternative_hostname_path_prefix:
request.repository.networkConfig.prefix = (
repo_args.alternative_hostname_path_prefix
)
if repo_args.alternative_hostname_default:
request.repository.networkConfig.isDefault = True
request.repository.name = repo_ref.RelativeName()
request.repositoryId = repo_ref.repositoriesId
request.repository.format = repo_format
return request
def AppendUpstreamPoliciesToRequest(repo_ref, repo_args, request):
"""Adds upstream policies to CreateRepositoryRequest."""
messages = _GetMessagesForResource(repo_ref)
if repo_args.upstream_policy_file:
if isinstance(
request,
messages.ArtifactregistryProjectsLocationsRepositoriesPatchRequest,
):
# Clear the updateMask for update request, so AR will replace all old
# policies with policies from the file.
request.updateMask = None
content = console_io.ReadFromFileOrStdin(
repo_args.upstream_policy_file, binary=False
)
policies = json.loads(content)
request.repository.virtualRepositoryConfig = (
messages.VirtualRepositoryConfig()
)
request.repository.virtualRepositoryConfig.upstreamPolicies = []
for policy in policies:
if all(key in policy for key in ("id", "priority", "repository")):
p = messages.UpstreamPolicy(
id=policy["id"],
priority=policy["priority"],
repository=policy["repository"],
)
request.repository.virtualRepositoryConfig.upstreamPolicies.append(p)
else:
raise ar_exceptions.InvalidInputValueError(_INVALID_UPSTREAM_POLICY)
return request
def AddAdditionalArgs():
"""Adds additional flags."""
return UpstreamsArgs() + RepoFormatArgs() + remote_repo_util.Args()
def UpstreamsArgs():
"""Adds the upstream-policy-file flag."""
# Is required because the upload operation requires the type conversion that
# should be done by a function. The "File" metavar is also usually handled by
# custom functions.
return [
base.Argument(
"--upstream-policy-file",
metavar="FILE",
help=_REPO_CREATION_HELP_UPSTREAM_POLICIES)
]
def RepoFormatArgs():
"""Adds the repository-format flag."""
# We need to do this because the declarative framework doesn't support
# hiding an enum from the help text.
return [
base.Argument(
"--repository-format", required=True, help=_REPO_CREATION_HELP_TEXT)
]
def AddRepositoryFormatArgBeta():
"""Adds the repository-format flag."""
# We need to do this because the declarative framework doesn't support
# hiding an enum from the help text.
return [
base.Argument(
"--repository-format",
required=True,
help=_REPO_CREATION_HELP_TEXT_BETA)
]
def AddTargetForAttachments(unused_repo_ref, repo_args, request):
"""If the target field is set, adds it to the server side request.
Args:
unused_repo_ref: Repo reference input.
repo_args: User input arguments.
request: ListAttachments request.
Returns:
ListAttachments request.
"""
if not repo_args.target:
return request
target = repo_args.target
try:
docker_version = docker_util.ParseDockerVersionStr(repo_args.target)
target = docker_version.GetVersionName()
except ar_exceptions.InvalidInputValueError:
pass
request.filter = f'target="{target}"'
return request
def AddTypeForAttachments(unused_repo_ref, repo_args, request):
"""If the type field is set, add it to the server side request.
Args:
unused_repo_ref: Repo reference input.
repo_args: User input arguments.
request: ListAttachments request.
Returns:
ListAttachments request.
"""
if not repo_args.attachment_type:
return request
if request.filter:
request.filter += f' AND type="{repo_args.attachment_type}"'
else:
request.filter = f'type="{repo_args.attachment_type}"'
return request
def _GetServiceAgent(project_id):
"""Returns the service agent for the given project."""
project_num = project_util.GetProjectNumber(project_id)
project_prefix = (
universe_descriptor.GetUniverseDomainDescriptor().project_prefix
)
if project_prefix:
project_prefix = project_prefix + "."
return _AR_SERVICE_ACCOUNT.format(
project_num=project_num, project_prefix=project_prefix
)
def CheckServiceAccountPermission(unused_repo_ref, repo_args, request):
"""Checks and grants key encrypt/decrypt permission for service account.
Checks if Artifact Registry service account has encrypter/decrypter or owner
role for the given key. If not, prompts users to grant key encrypter/decrypter
permission to the service account. Operation would fail if users do not grant
the permission.
Args:
unused_repo_ref: Repo reference input.
repo_args: User input arguments.
request: Create repository request.
Returns:
Create repository request.
"""
if not repo_args.kms_key:
return request
# Best effort to check if AR's service account has permission to use the key;
# ignore if the caller identity does not have enough permission to check.
try:
service_account = _GetServiceAgent(GetProject(repo_args))
policy = ar_requests.GetCryptoKeyPolicy(repo_args.kms_key)
for binding in policy.bindings:
if "serviceAccount:" + service_account in binding.members and (
binding.role == "roles/cloudkms.cryptoKeyEncrypterDecrypter" or
binding.role == "roles/owner"):
return request
grant_permission = console_io.PromptContinue(
prompt_string=(
"\nGrant the Artifact Registry Service Account {service_account} "
"permission to encrypt/decrypt with the selected key [{key_name}]"
.format(service_account=service_account, key_name=repo_args.kms_key)
)
)
if not grant_permission:
return request
try:
ar_requests.AddCryptoKeyPermission(repo_args.kms_key,
"serviceAccount:" + service_account)
# We have checked the existence of the key when checking IAM bindings
# So all 400s should be because the service account is problematic.
# We are moving the permission check to the backend fairly soon anyway.
except apitools_exceptions.HttpBadRequestError:
msg = (
"The Artifact Registry service account might not exist, manually "
"create the service account.\nLearn more: "
"https://cloud.google.com/artifact-registry/docs/cmek")
raise ar_exceptions.ArtifactRegistryError(msg)
log.status.Print(
"Added Cloud KMS CryptoKey Encrypter/Decrypter Role to [{key_name}]"
.format(key_name=repo_args.kms_key))
except apitools_exceptions.HttpForbiddenError:
return request
return request
def DeleteVersionTags(ver_ref, ver_args, request):
"""Deletes tags associate with the specified version."""
if not ver_args.delete_tags:
return request
client = _GetClientForResource(ver_ref)
messages = _GetMessagesForResource(ver_ref)
escaped_pkg = ver_ref.packagesId.replace("/", "%2F").replace("+", "%2B")
escaped_pkg = escaped_pkg.replace("^", "%5E")
package = resources.REGISTRY.Create(
"artifactregistry.projects.locations.repositories.packages",
projectsId=ver_ref.projectsId,
locationsId=ver_ref.locationsId,
repositoriesId=ver_ref.repositoriesId,
packagesId=escaped_pkg)
tag_list = ar_requests.ListTags(client, messages,
package.RelativeName())
for tag in tag_list:
if tag.version != request.name:
continue
ar_requests.DeleteTag(client, messages, tag.name)
return request
def AppendTagDataToRequest(tag_ref, tag_args, request):
"""Adds tag data to CreateTagRequest."""
parts = request.parent.split("/")
pkg_path = "/".join(parts[:len(parts) - 2])
request.parent = pkg_path
messages = _GetMessagesForResource(tag_ref)
tag = messages.Tag(
name=tag_ref.RelativeName(),
version=pkg_path + "/versions/" + tag_args.version)
request.tag = tag
request.tagId = tag_ref.tagsId
return request
def SetTagUpdateMask(tag_ref, tag_args, request):
"""Sets update mask to UpdateTagRequest."""
messages = _GetMessagesForResource(tag_ref)
parts = request.name.split("/")
pkg_path = "/".join(parts[:len(parts) - 2])
tag = messages.Tag(
name=tag_ref.RelativeName(),
version=pkg_path + "/versions/" + tag_args.version)
request.tag = tag
request.updateMask = "version"
return request
def EscapePackageName(pkg_ref, unused_args, request):
"""Escapes slashes and pluses in package name for ListVersionsRequest."""
escaped_pkg = pkg_ref.packagesId.replace("/", "%2F").replace("+", "%2B")
escaped_pkg = escaped_pkg.replace("^", "%5E")
request.parent = "{}/packages/{}".format(
pkg_ref.Parent().RelativeName(),
escaped_pkg)
return request
def EscapePackageStr(pkg: str):
"""Escapes slashes and pluses in package name of type string."""
return pkg.replace("/", "%2F").replace("+", "%2B").replace("^", "%5E")
def AppendSortingToRequest(unused_ref, ver_args, request):
"""Adds order_by and page_size parameters to the request."""
order_by = common_args.ParseSortByArg(ver_args.sort_by)
set_limit = True
# Multi-ordering is not supported yet on backend.
if order_by is not None:
if "," not in order_by:
request.orderBy = order_by
else:
set_limit = False
if (ver_args.limit is not None and ver_args.filter is None and set_limit):
request.pageSize = ver_args.limit
# Otherwise request gets overridden somewhere down the line.
ver_args.page_size = ver_args.limit
return request
def UnescapePackageName(response, unused_args):
"""Unescapes slashes and pluses in package name from ListPackagesResponse."""
ret = []
for ver in response:
ver.name = os.path.basename(ver.name)
ver.name = ver.name.replace("%2F", "/").replace("%2B", "+")
ver.name = ver.name.replace("%5E", "^")
ret.append(ver)
return ret
def AppendRuleDataToRequest(rule_ref, unused_args, request):
"""Adds rule data to CreateRuleRequest."""
parts = request.parent.split("/")
request.parent = "/".join(parts[: len(parts) - 2])
if not _IsValidRuleName(rule_ref.rulesId):
raise ar_exceptions.InvalidInputValueError(_INVALID_RULE_NAME_ERROR)
request.ruleId = rule_ref.rulesId
return request
def AppendParentInfoToListReposResponse(response, args):
"""Adds log to clarify parent resources for ListRepositoriesRequest."""
log.status.Print("Listing items under project {}, location {}.\n".format(
GetProject(args), GetLocation(args)))
return response
def AppendParentInfoToListPackagesResponse(response, args):
"""Adds log to clarify parent resources for ListPackagesRequest."""
log.status.Print(
"Listing items under project {}, location {}, repository {}.\n".format(
GetProject(args), GetLocation(args), GetRepo(args)))
return response
def AppendParentInfoToListVersionsAndTagsResponse(response, args):
"""Adds log to clarify parent resources for ListVersions or ListTags."""
log.status.Print(
"Listing items under project {}, location {}, repository {}, "
"package {}.\n".format(
GetProject(args), GetLocation(args), GetRepo(args), args.package))
return response
def AppendParentInfoToListRulesResponse(response, args):
"""Adds log to clarify parent resources for ListRulesRequest."""
log.status.Print(
f"Listing items under project {GetProject(args)}, location"
f" {GetLocation(args)}, repository {GetRepo(args)}.\n"
)
return response
def GetGCRRepos(buckets, project):
"""Gets a list of GCR repositories given a list of GCR bucket names."""
existing_buckets = GetExistingGCRBuckets(buckets, project)
def RepoMsg(bucket):
return bucket["repository"]
return list(map(RepoMsg, existing_buckets))
def GetExistingGCRBuckets(buckets, project):
"""Gets the list of GCR bucket names that exist in the project."""
existing_buckets = []
project_id_for_bucket = project
if ":" in project:
domain, project_id = project.split(":")
project_id_for_bucket = "{}.{}.a".format(project_id, domain)
for bucket in buckets.values():
try:
ar_requests.TestStorageIAMPermission(
bucket["bucket"].format(project_id_for_bucket), project)
existing_buckets.append(bucket)
except apitools_exceptions.HttpNotFoundError:
continue
return existing_buckets
def ListRepositories(args):
"""Lists repositories in a given project.
If no location value is specified, list repositories across all locations.
Args:
args: User input arguments.
Returns:
List of repositories.
"""
project = GetProject(args)
location = args.location or properties.VALUES.artifacts.location.Get()
loc_paths = []
if location and location != "all":
log.status.Print("Listing items under project {}, location {}.\n".format(
project, location))
loc_paths.append("projects/{}/locations/{}".format(project, location))
else:
location_list = ar_requests.ListLocations(project)
log.status.Print(
"Listing items under project {}, across all locations.\n".format(
project))
loc_paths.extend([
"projects/{}/locations/{}".format(project, loc) for loc in location_list
])
pool_size = len(loc_paths) if loc_paths else 1
if platforms.OperatingSystem.Current() is platforms.OperatingSystem.WINDOWS:
pool_size = multiprocessing.cpu_count() if loc_paths else 1
page_size = args.page_size
order_by = common_args.ParseSortByArg(args.sort_by)
_, server_filter = filter_rewriter.Rewriter().Rewrite(args.filter)
if order_by is not None:
if "," in order_by:
# Multi-ordering is not supported yet on backend, fall back to client-side
# sort-by.
order_by = None
if args.limit is not None and args.filter is not None:
if server_filter is not None:
# Use server-side paging with server-side filtering.
page_size = args.limit
args.page_size = args.limit
else:
# Fall back to client-side paging with client-side filtering.
page_size = None
def ListLocationRepos(
project, page_size=None, order_by=None, server_filter=None
):
"""Lists repositories in a given project and location, and if an error occurs, returns an empty list."""
try:
return ar_requests.ListRepositories(
project,
page_size=page_size,
order_by=order_by,
server_filter=server_filter,
)
except apitools_exceptions.HttpError as e:
if e.status_code > 500:
log.warning(
"Failed to list repositories for project {}".format(
project
)
)
return []
else:
raise
def ListRepos(page_size=None, order_by=None, server_filter=None):
pool = parallel.GetPool(pool_size)
try:
pool.Start()
results = pool.Map(
lambda x: ListLocationRepos(
x,
page_size=page_size,
order_by=order_by,
server_filter=server_filter,
),
loc_paths,
)
except parallel.MultiError as e:
if server_filter or order_by:
for err in e.errors:
if err.status_code == 400:
raise apitools_exceptions.HttpBadRequestError(
err.content, err.status_code, err.url
)
error_set = set(err.content for err in e.errors)
msg = "\n".join(error_set)
raise ar_exceptions.ArtifactRegistryError(msg)
finally:
pool.Join()
return results
repos = []
server_args = {
"server_filter": server_filter,
"page_size": page_size,
"order_by": order_by
}
server_args_skipped, result = RetryOnInvalidArguments(
ListRepos,
**server_args)
for sublist in result:
repos.extend(sublist)
# If server-side filter or sort-by is parsed correctly and the request
# succeeds, remove the client-side filter and sort-by.
if not server_args_skipped:
if server_args["order_by"]:
args.sort_by = None
if (
server_args["server_filter"]
and server_args["server_filter"] == args.filter
):
args.filter = None
return repos
def RetryOnInvalidArguments(func, **kwargs):
"""Retry the request on invalid arguments error.
If the request fails with 400 because of unsupported server-side filter or
sort-by, retry the request with no filter or sort-by.
Args:
func: Retry function.
**kwargs: User input arguments.
Returns:
retried: If the request is retried without server-side filter or sort-by.
results: List of results.
"""
try:
results = func(**kwargs)
return False, results
except apitools_exceptions.HttpBadRequestError as e:
# If the error is a FAILED_PRECONDITION, do not retry the request.
if hasattr(e, "content") and e.content:
try:
content = json.loads(e.content)
if content["error"]["status"] == "FAILED_PRECONDITION":
raise e
except (json.JSONDecodeError, KeyError, TypeError):
pass
if kwargs["server_filter"]:
kwargs["server_filter"] = None
# If server-side filter is not supported, discard the server-side paging
# in retry.
if kwargs.get("page_size"):
kwargs["page_size"] = None
if kwargs.get("limit"):
kwargs["limit"] = None
if kwargs.get("order_by"):
kwargs["order_by"] = None
return True, func(**kwargs)
except Exception as e:
raise ar_exceptions.ArtifactRegistryError(e)
def AddEncryptionLogToRepositoryInfo(response, unused_args):
"""Adds encryption info log to repository info."""
if response.kmsKeyName:
log.status.Print("Encryption: Customer-managed key")
else:
log.status.Print("Encryption: Google-managed key")
return response
def AddRegistryBaseToRepositoryInfo(response, unused_args):
"""Adds the base URL of the repo for registry operations to repository info."""
if not response.registryUri:
repo_name = resources.REGISTRY.ParseRelativeName(
response.name,
collection="artifactregistry.projects.locations.repositories",
)
log.status.Print(
"Registry URL: {}-{}.pkg.dev/{}/{}".format(
repo_name.locationsId,
str(response.format).lower(),
repo_name.projectsId.replace(":", "/"),
repo_name.repositoriesId,
)
)
return response
def ConvertBytesToMB(response, unused_args):
if response.sizeBytes is not None:
log.status.Print("Repository Size: {0:.3f}MB".format(response.sizeBytes /
1e6))
else:
log.status.Print("Repository Size: {0:.3f}MB".format(0))
response.sizeBytes = None
return response
def EscapePackageNameHook(ref, unused_args, req):
"""Escapes slashes and pluses from request names."""
escaped_pkg = ref.packagesId.replace("/", "%2F").replace("+", "%2B")
escaped_pkg = escaped_pkg.replace("^", "%5E")
package = resources.REGISTRY.Create(
"artifactregistry.projects.locations.repositories.packages",
projectsId=ref.projectsId,
locationsId=ref.locationsId,
repositoriesId=ref.repositoriesId,
packagesId=escaped_pkg)
req.name = package.RelativeName()
return req
def EscapeTagNameHook(ref, unused_args, req):
"""Escapes slashes and pluses from request names."""
escaped_pkg = ref.packagesId.replace("/", "%2F").replace("+", "%2B")
escaped_pkg = escaped_pkg.replace("^", "%5E")
tag = resources.REGISTRY.Create(
"artifactregistry.projects.locations.repositories.packages.tags",
projectsId=ref.projectsId,
locationsId=ref.locationsId,
repositoriesId=ref.repositoriesId,
packagesId=escaped_pkg,
tagsId=ref.tagsId.replace("/", "%2F").replace("+", "%2B"))
req.name = tag.RelativeName()
return req
def EscapeVersionNameHook(ref, unused_args, req):
"""Escapes slashes and pluses from request names."""
escaped_pkg = ref.packagesId.replace("/", "%2F").replace("+", "%2B")
escaped_pkg = escaped_pkg.replace("^", "%5E")
escaped_ver = ref.versionsId.replace("/", "%2F").replace("+", "%2B")
escaped_ver = escaped_ver.replace("^", "%5E")
version = resources.REGISTRY.Create(
"artifactregistry.projects.locations.repositories.packages.versions",
projectsId=ref.projectsId,
locationsId=ref.locationsId,
repositoriesId=ref.repositoriesId,
packagesId=escaped_pkg,
versionsId=escaped_ver,
)
req.name = version.RelativeName()
return req
gcr_base = getattr(properties.VALUES.artifacts, "gcr_host").Get()
host_seperator = "-" if "-" in gcr_base else "."
gcr_repos = [
{"repository": "gcr.io", "location": "us", "host": f"{gcr_base}"},
{
"repository": "us.gcr.io",
"location": "us",
"host": f"us{host_seperator}{gcr_base}",
},
{
"repository": "asia.gcr.io",
"location": "asia",
"host": f"asia{host_seperator}{gcr_base}",
},
{
"repository": "eu.gcr.io",
"location": "europe",
"host": f"eu{host_seperator}{gcr_base}",
},
]
def GetMultiProjectRedirectionEnablementReport(projects):
"""Prints a redirection enablement report and returns mis-configured repos.
This checks all the GCR repositories in the supplied project and checks if
they each have a repository in Artifact Registry create to be the redirection
target. It prints a report as it validates.
Args:
projects: The projects to validate
Returns:
A list of the GCR repos that do not have a redirection repo configured in
Artifact Registry.
"""
missing_repos = {}
if not projects:
return missing_repos
repo_report = []
con = console_attr.GetConsoleAttr()
# For each gcr repo in a location that our environment supports,
# is there an associated repo in AR?
for project in projects:
report_line = [project, 0]
p_repos = []
for gcr_repo in gcr_repos:
ar_repo_name = "projects/{}/locations/{}/repositories/{}".format(
project, gcr_repo["location"], gcr_repo["repository"]
)
try:
ar_requests.GetRepository(ar_repo_name)
except apitools_exceptions.HttpNotFoundError:
report_line[1] += 1
p_repos.append(gcr_repo)
repo_report.append(report_line)
if p_repos:
missing_repos[project] = p_repos
log.status.Print("Project Repository Report:\n")
printer = resource_printer.Printer("table", out=log.status)
printer.AddHeading([
con.Emphasize("Project", bold=True),
con.Emphasize("Missing Artifact Registry Repos to Create", bold=True),
])
for line in repo_report:
printer.AddRecord(line)
printer.Finish()
log.status.Print()
return missing_repos
def GetRedirectionEnablementReport(project):
"""Prints a redirection enablement report and returns mis-configured repos.
This checks all the GCR repositories in the supplied project and checks if
they each have a repository in Artifact Registry create to be the redirection
target. It prints a report as it validates.
Args:
project: The project to validate
Returns:
A list of the GCR repos that do not have a redirection repo configured in
Artifact Registry.
"""
missing_repos = []
repo_report = []
# report_line = []
con = console_attr.GetConsoleAttr()
location = getattr(properties.VALUES.artifacts, "location").Get()
# For each gcr repo in a location that our environment supports,
# is there an associated repo in AR?
for gcr_repo in gcr_repos:
# For gcr.io, redirection affects every location
if gcr_base != "gcr.io" and location and location != gcr_repo["location"]:
continue
report_line = [gcr_repo["host"], gcr_repo["location"]]
ar_repo_name = "projects/{}/locations/{}/repositories/{}".format(
project, gcr_repo["location"], gcr_repo["repository"])
try:
ar_repo = ar_requests.GetRepository(ar_repo_name)
report_line.append(con.Colorize(ar_repo.name, "green"))
except apitools_exceptions.HttpNotFoundError:
report_line.append(
con.Colorize(
'None Found. Can create repo named "{}"'.format(
gcr_repo["repository"]
),
"yellow",
)
)
missing_repos.append(gcr_repo)
repo_report.append(report_line)
log.status.Print(f"Repository report for {project}:\n")
printer = resource_printer.Printer("table", out=log.status)
printer.AddHeading([
con.Emphasize("Container Registry Host", bold=True),
con.Emphasize("Location", bold=True),
con.Emphasize("Artifact Registry Repository", bold=True)
])
for line in repo_report:
printer.AddRecord(line)
printer.Finish()
log.status.Print()
return missing_repos
def GetExistingRepos(project):
"""Gets the already created repos for the given project."""
found_repos = []
location = getattr(properties.VALUES.artifacts, "location").Get()
for gcr_repo in gcr_repos:
if gcr_base != "gcr.io" and location and location != gcr_repo["location"]:
continue
ar_repo_name = "projects/{}/locations/{}/repositories/{}".format(
project, gcr_repo["location"], gcr_repo["repository"]
)
try:
ar_requests.GetRepository(ar_repo_name)
found_repos.append(gcr_repo)
except apitools_exceptions.HttpNotFoundError:
continue
return found_repos
# TODO(b/261183749): Remove modify_request_hook when singleton resource args
# are enabled in declarative.
def UpdateSettingsResource(unused_ref, unused_args, req):
req.name = req.name + "/projectSettings"
return req
def GetVPCSCConfig(unused_ref, args):
project = GetProject(args)
location = GetLocation(args)
return ar_requests.GetVPCSCConfig(project, location)
def AllowVPCSCConfig(unused_ref, args):
project = GetProject(args)
location = GetLocation(args)
return ar_requests.AllowVPCSCConfig(project, location)
def DenyVPCSCConfig(unused_ref, args):
project = GetProject(args)
location = GetLocation(args)
return ar_requests.DenyVPCSCConfig(project, location)
def LogUserPermissionDeniedError(project):
"""Logs a message about how to grant the user permission to perform migration steps.
Args:
project: The project missing permission
"""
user = properties.VALUES.core.account.Get()
if user.endswith("gserviceaccount.com"):
prefix = "serviceAccount"
else:
prefix = "user"
con = console_attr.GetConsoleAttr()
log.status.Print(
con.Emphasize(
"\nYou can get permission to perform all migration steps if a project"
" owner grants you"
" roles/artifactregistry.containerRegistryMigrationAdmin:",
bold=True,
),
)
log.status.Print(
f" gcloud projects add-iam-policy-binding {project} "
f"--member={prefix}:{user} --role='roles/artifactregistry.containerRegistryMigrationAdmin'\n"
.format(prefix=prefix, user=user),
)
def GetRedirectionStates(projects):
"""Gets the redirection states for the given projects.
Args:
projects: The projects to get the redirection states for
Returns:
A dictionary of project to redirection state.
raises:
apitools_exceptions.HttpForbiddenError: If the user does not have permission
to get the redirection state for a project.
"""
env = "prod"
endpoint_property = getattr(
properties.VALUES.api_endpoint_overrides, "artifactregistry"
)
old_endpoint = endpoint_property.Get()
if old_endpoint and "staging" in old_endpoint:
env = "staging"
# Staging uses prod redirect endpoint
# gcloud-disable-gdu-domain
endpoint_property.Set("https://artifactregistry.googleapis.com/")
redirection_states = {}
try:
for project in projects:
try:
redirection_states[project] = ar_requests.GetProjectSettings(
project
).legacyRedirectionState
except apitools_exceptions.HttpForbiddenError as e:
LogUserPermissionDeniedError(project)
raise e
finally:
if env == "staging":
endpoint_property.Set(old_endpoint)
return redirection_states
def SetRedirectionStatus(project, status, pull_percent=None):
"""Sets the redirection status for the given project."""
endpoint_property = getattr(
properties.VALUES.api_endpoint_overrides, "artifactregistry"
)
old_endpoint = endpoint_property.Get()
env = "prod"
try:
if old_endpoint and "staging" in old_endpoint:
env = "staging"
# Staging uses prod redirect endpoint
# gcloud-disable-gdu-domain
endpoint_property.Set("https://artifactregistry.googleapis.com/")
ar_requests.SetUpgradeRedirectionState(project, status, pull_percent)
except apitools_exceptions.HttpForbiddenError as e:
con = console_attr.GetConsoleAttr()
match = re.search("requires (.*) to have storage.objects.", str(e))
if not match:
LogUserPermissionDeniedError(project)
raise
log.status.Print(
con.Colorize("\nERROR:", "red")
+ " The Artifact Registry service account doesn't have access to"
" {project} for copying images\nThe following command will grant the"
" necessary access (may take a few minutes):\n gcloud projects"
" add-iam-policy-binding {project} --member='serviceAccount:{p4sa}'"
" --role='roles/storage.objectViewer'\n".format(
p4sa=match[1], project=project
),
)
return False
finally:
if env == "staging":
endpoint_property.Set(old_endpoint)
return True
def RecommendAuthChange(
policy_addition,
existing_policy,
location,
project,
repo,
failures,
pkg_dev=False,
output_iam_policy_dir=None,
):
"""Prompts the user to possibly change the repository's iam policy."""
con = console_attr.GetConsoleAttr()
if existing_policy.bindings:
etag = existing_policy.etag
# Strip all non-binding info from existing policy. By default setIamPolicy
# only uses bindings and etag
existing_policy_bindings = encoding.MessageToDict(existing_policy)[
"bindings"
]
existing_string = yaml.dump({"bindings": existing_policy_bindings})
# Remove the opening "bindings:" line from the new string
new_string = yaml.dump(encoding.MessageToDict(policy_addition)).split(
"\n", 1
)[1]
if new_string:
string_policy = (
f"# Existing repository policy:\n{existing_string}\n# New"
f" additions:\n{new_string}"
)
else:
string_policy = (
f"# Existing repository policy:\n{existing_string}\n# No new bindings"
" added"
)
else:
d = encoding.MessageToDict(policy_addition)
string_policy = yaml.dump(d)
if not d:
string_policy += "\n# No bindings needed"
etag = ""
warning_message = (
f"Generated bindings for {project}/{repo} may be"
" insufficient because you do not have access to analyze IAM for the"
f" following resources: {failures}"
"\nSee"
" https://cloud.google.com/policy-intelligence/docs/analyze-iam-policies#required-permissions"
)
if output_iam_policy_dir:
log.status.Print(f"\nWriting bindings for {project}/{repo}...")
if failures:
log.status.Print(f"{con.Colorize('Warning:', 'red')} {warning_message}")
commented_warning = "# " + "\n# ".join(warning_message.split("\n"))
string_policy = f"{commented_warning}\n\n{string_policy}"
outfile = os.path.join(output_iam_policy_dir, project, f"{repo}.yaml")
# WriteFileContents calls ExpandHomeDir internally only for the path,
# which causes weird errors if we don't pre-expand it
files.WriteFileContents(
files.ExpandHomeDir(outfile), string_policy, create_path=True
)
return True
log.status.Print(
con.Emphasize(
"\nPotential IAM change for {} repository in project {}:\n".format(
repo, project
),
bold=True,
)
)
log.status.Print(string_policy)
message = (
"This IAM policy will grant users the ability to perform all actions in"
" Artifact Registry that they can currently perform in Container"
" Registry. This policy may allow access that was previously prevented by"
" deny policies or IAM conditions."
)
if failures:
message += f"\n\n{con.Colorize('Warning:', 'red')} {warning_message}\n\n"
if not console_io.CanPrompt():
log.status.Print(message)
log.status.Print(
"\nPrompting is disabled. To make interactive iam changes,"
" enable prompting. Otherwise, manually add any missing"
" Artifact Registry permissions and rerun using"
" --skip-iam-update."
)
edited = False
c = console_attr.GetConsoleAttr()
while True:
choices = []
options = []
if pkg_dev:
options = [
"Apply {} policy to the {}/{} Artifact Registry repository".format(
"edited" if edited else "above", project, repo
),
"Edit policy",
"Do not copy permissions for this repo",
"Exit",
]
choices = ["apply", "edit", "skip", "exit"]
else:
options = [
"Apply {} policy to the {}/{} Artifact Registry repository".format(
"edited" if edited else "above", project, repo
)
+ c.Colorize(" (preserves accesss for GCR users)", "green"),
"Edit policy",
"Do not copy permissions for this repo"
+ c.Colorize(
f" (users may lose access to {repo}/{project.replace(':', '/')})",
"red",
),
"Skip permission copying for all remaining repos"
+ c.Colorize(
" (users may lose access to all remaining repos)", "red"
),
"Exit",
]
choices = ["apply", "edit", "skip", "skip_all", "exit"]
option = console_io.PromptChoice(
message=message,
options=options,
default=1,
)
if option < 0 or option >= len(choices):
raise ValueError(f"Unknown option: {option}")
if choices[option] == "apply":
log.status.Print(
"Applying policy to repository {}/{}".format(project, repo)
)
new_binding = encoding.PyValueToMessage(
ar_requests.GetMessages().Policy, yaml.load(string_policy)
)
if etag:
new_binding.etag = etag
try:
ar_requests.SetIamPolicy(
"projects/{}/locations/{}/repositories/{}".format(
project, location, repo
),
new_binding,
)
return True
except apitools_exceptions.HttpError as e:
log.status.Print(
"\nFailed to update iam policy:\n{}\n".format(
json.loads(e.content)["error"]["message"]
)
)
raise e
elif choices[option] == "edit":
string_policy = edit.OnlineEdit(string_policy)
message = con.Emphasize(
"\nEdited policy:", bold=True
) + "\n\n{}\n".format(string_policy)
edited = True
continue
# Skip policy for this repo
elif choices[option] == "skip":
return True
# Skip policy for all repos
elif choices[option] == "skip_all":
return False
# Exit
elif choices[option] == "exit":
raise console_io.OperationCancelledError()
else:
raise ValueError(f"Unknown choice: {choices[option]}")
def SetupAuthForProject(
project,
repos,
repos_with_buckets,
output_iam_policy_dir=None,
input_iam_policy_dir=None,
use_analyze=True,
):
"""Sets up auth for all repos in the given project."""
diffs_found = False
for repo in repos:
has_bucket = repo["repository"] in repos_with_buckets
repo_diffs, continue_auth_check = SetupAuthForRepository(
project,
project,
repo["repository"],
repo,
has_bucket,
output_iam_policy_dir=output_iam_policy_dir,
input_iam_policy_dir=input_iam_policy_dir,
use_analyze=use_analyze,
)
if repo_diffs:
diffs_found = True
if not continue_auth_check:
return diffs_found, False
if not diffs_found and not input_iam_policy_dir:
con = console_attr.GetConsoleAttr()
log.status.Print(
con.Colorize("OK: ", "green")
+ "All Container Registry repositories have equivalent Artifact"
" Registry permissions for project {}".format(project)
)
return diffs_found, True
def WarnNoAuthGenerated(pkg_dev=False):
if pkg_dev:
warning = ""
else:
warning = " If you continue, users may lose access to *gcr.io repositories."
console_io.PromptContinue(
"Cannot generate a new IAM policy because you do not have permission to"
" view existing policies. See"
" https://cloud.google.com/policy-intelligence/docs/analyze-iam-policies#required-permissions"
f" for required permissions.{warning}",
"Continue without updating IAM policy?",
cancel_on_no=True,
)
def CalculateMissingAuth(gcr_auth, ar_non_repo_auth, ar_repo_policy):
"""Calculates auth that should be added to a Repository to match GCR auth."""
missing_auth = collections.defaultdict(set)
ar_repo_map = upgrade_util.map_from_policy(ar_repo_policy)
collections.defaultdict(set)
for role, gcr_members in gcr_auth.items():
missing_auth[role] = gcr_members.difference(ar_non_repo_auth[role])
missing_auth[role] = missing_auth[role].difference(ar_repo_map[role])
# Remove GCR/AR service accounts. These will almost always be there but
# aren't needed for compatibility
missing_auth[role] = set(
filter(
lambda member: not member.endswith(
"@containerregistry.iam.gserviceaccount.com"
)
and not member.endswith(
"gcp-sa-artifactregistry.iam.gserviceaccount.com"
)
and not member.endswith(
"artifact-registry-same-project-copier@system.gserviceaccount.com"
),
missing_auth[role],
)
)
if not missing_auth[role]:
del missing_auth[role]
return missing_auth
def SetupAuthForRepository(
gcr_project,
ar_project,
host,
repo,
has_bucket,
pkg_dev=False,
output_iam_policy_dir=None,
input_iam_policy_dir=None,
use_analyze=True,
):
"""Checks permissions for a repository and prompts for changes if any is missing.
Checks permission for a repository and provides a list of users/roles that had
permissions in GCR but are missing equivalent roles in AR. Prompts the user to
add these roles, edit them, or keep permissions the same.
Args:
gcr_project: The GCR project
ar_project: The AR project
host: The GCR host (like gcr.io)
repo: The AR repo being copied to
has_bucket: Whether a GCR bucket exists for this repository
pkg_dev: If true, this is for a single pkg.dev repo (prompts are different)
output_iam_policy_dir: If set, output iam files to this dir
input_iam_policy_dir: If set, use iam files from this dir
use_analyze: If true, use AnalyzeIamPolicy to generate the policy
Returns:
A tuple of (diffs_found, should_continue) where diffs_found is true if
there were auth diffs found between GCR + AR and should_continue is true
if the tool should continue recommending auth changes for subsequent
repos.
"""
if input_iam_policy_dir:
try:
string_policy = files.ReadFileContents(
os.path.join(
files.ExpandHomeDir(input_iam_policy_dir),
ar_project,
f"{repo['repository']}.yaml",
)
)
except files.MissingFileError:
log.status.Print(
f"No policy change found for {ar_project}/{repo['repository']}."
" Skipping this repository."
)
return False, True
con = console_attr.GetConsoleAttr()
log.status.Print(
con.Colorize(
f"Applying policy to repository {ar_project}/{repo['repository']}",
"green",
)
)
new_binding = encoding.PyValueToMessage(
ar_requests.GetMessages().Policy, yaml.load(string_policy)
)
try:
ar_requests.SetIamPolicy(
"projects/{}/locations/{}/repositories/{}".format(
ar_project, repo["location"], repo["repository"]
),
new_binding,
)
return True, True
except apitools_exceptions.HttpError as e:
log.status.Print(
"\nFailed to update iam policy:\n{}\n".format(
json.loads(e.content)["error"]["message"]
)
)
raise e
gcr_auth, failures = copy.deepcopy(
upgrade_util.iam_map(
host if has_bucket else "",
gcr_project,
skip_bucket=(not has_bucket),
from_ar_permissions=False,
best_effort=True,
use_analyze=use_analyze,
)
)
if not gcr_auth and failures:
WarnNoAuthGenerated(pkg_dev=pkg_dev)
return True, False
ar_non_repo_auth, _ = copy.deepcopy(
upgrade_util.iam_map(
"",
ar_project,
skip_bucket=True,
from_ar_permissions=True,
best_effort=True,
use_analyze=use_analyze,
)
)
# The AR auth policy on the repo. Doesn't include project+ auth above
ar_repo_policy = ar_requests.GetIamPolicy(
"projects/{}/locations/{}/repositories/{}".format(
ar_project, repo["location"], repo["repository"]
)
)
missing_auth = CalculateMissingAuth(
gcr_auth, ar_non_repo_auth, ar_repo_policy
)
if missing_auth or output_iam_policy_dir:
continue_checking_auth = RecommendAuthChange(
upgrade_util.policy_from_map(missing_auth),
ar_repo_policy,
repo["location"],
ar_project,
repo["repository"],
failures=failures,
pkg_dev=pkg_dev,
output_iam_policy_dir=output_iam_policy_dir,
)
return True, continue_checking_auth
elif failures:
# Nothing to do, but we still need to warn
con = console_attr.GetConsoleAttr()
warning_message = (
"Unable to confirm IAM bindings for"
f" {ar_project}/{repo['repository']} are sufficient because you do not"
" have access to view IAM bindings for the following resources:"
f" {failures}\nUse --log-http to see detailed errors."
)
log.status.Print(f"\n{con.Colorize('Warning:', 'red')} {warning_message}")
return True, True
# No diffs found, continue checking auth
return False, True
def MigrateToArtifactRegistry(unused_ref, args):
"""Runs the automigrate wizard for the current project."""
if args.projects:
projects = args.projects.split(",")
base.DisableUserProjectQuota()
else:
projects = [args.project or properties.VALUES.core.project.GetOrFail()]
project_ids = []
for project in projects:
if project.isnumeric():
project_ids.append(
projects_api.Get(project_util.ParseProject(project)).projectId
)
else:
project_ids.append(project)
projects = project_ids
recent_images = args.recent_images
last_uploaded_versions = args.last_uploaded_versions
from_gcr = args.from_gcr
to_pkg_dev = args.to_pkg_dev
copy_only = args.copy_only
canary_reads = args.canary_reads
skip_iam = args.skip_iam_update
ar_location = args.pkg_dev_location
skip_pre_copy = args.skip_pre_copy
use_analyze = args.use_analyze_iam
if ar_location and not to_pkg_dev:
log.status.Print(
"--pkg-dev-location is only used when migrating to pkg.dev repos"
)
sys.exit(1)
if recent_images is not None and (recent_images < 30 or recent_images > 180):
log.status.Print("--recent-images must be between 30 and 180 inclusive")
sys.exit(1)
output_iam_policy_dir = args.output_iam_policy_dir
input_iam_policy_dir = args.input_iam_policy_dir
if output_iam_policy_dir and (skip_iam or copy_only):
log.status.Print(
"--output-iam-policy-dir is only used when determining iam policy"
)
sys.exit(1)
if input_iam_policy_dir and (skip_iam or copy_only):
log.status.Print(
"--input-iam-policy-dir is only used when determining iam policy"
)
sys.exit(1)
if input_iam_policy_dir and output_iam_policy_dir:
log.status.Print(
"--input-iam-policy-dir and --output-iam-policy-dir should not be"
" called in the same invocation"
)
sys.exit(1)
if input_iam_policy_dir:
if not os.path.isdir(files.ExpandHomeDir(input_iam_policy_dir)):
log.status.Print("--input-iam-policy-dir must be a directory")
sys.exit(1)
if canary_reads is not None and (canary_reads < 0 or canary_reads > 100):
log.status.Print("--canary-reads must be between 0 and 100 inclusive")
sys.exit(1)
if (args.projects or args.project) and (from_gcr or to_pkg_dev):
log.status.Print(
"Projects argument may not be used when providing --from-gcr and"
" --to-pkg-dev"
)
sys.exit(1)
if bool(from_gcr) != bool(to_pkg_dev):
log.status.Print(
"--from-gcr and --to-pkg-dev-repo should be provided together"
)
sys.exit(1)
if last_uploaded_versions and recent_images:
log.status.Print(
"Only one of --last-uploaded-versions and --recent-images can be used"
)
sys.exit(1)
if to_pkg_dev:
s = from_gcr.split("/", 2)
if len(s) != 2:
log.status.Print("--from-gcr must be of the form {host}/{project}")
sys.exit(1)
gcr_host, gcr_project = s
s = to_pkg_dev.split("/", 2)
if len(s) != 2:
log.status.Print("--to-pkg-dev must be of the form {project}/{repo}")
sys.exit(1)
ar_project, ar_repo = s
if "gcr.io" in ar_repo:
log.status.Print(
"--to-pkg-dev is only used for pkg.dev repos. Use --projects to"
" migrate to a gcr.io repo"
)
sys.exit(1)
if gcr_host not in _ALLOWED_GCR_REPO_LOCATION.keys():
log.status.Print(
"{gcr_host} is not a valid gcr host. Valid hosts: {hosts}".format(
gcr_host=gcr_host,
hosts=", ".join(_ALLOWED_GCR_REPO_LOCATION.keys()),
)
)
sys.exit(1)
location = _ALLOWED_GCR_REPO_LOCATION[gcr_host]
if ar_location:
location = ar_location
host = "{}{}-docker.pkg.dev".format(
properties.VALUES.artifacts.registry_endpoint_prefix.Get(), location
)
if not copy_only:
CreatePkgDevIfMissing(host, location, ar_project, ar_repo)
has_bucket = GetGCRRepos(
{
k: v
for (k, v) in _GCR_BUCKETS.items()
if v["repository"] == gcr_host
},
gcr_project,
)
if not skip_iam:
if input_iam_policy_dir:
cont = console_io.PromptContinue(
f"\nContinuing will update {ar_project}/{ar_repo} IAM policy"
f" based on {input_iam_policy_dir}.",
default=True,
)
if not cont:
return None
diffs_found, _ = SetupAuthForRepository(
gcr_project=gcr_project,
ar_project=ar_project,
host=gcr_host,
repo={"location": location, "repository": ar_repo},
has_bucket=has_bucket,
pkg_dev=True,
input_iam_policy_dir=input_iam_policy_dir,
output_iam_policy_dir=output_iam_policy_dir,
use_analyze=use_analyze,
)
if output_iam_policy_dir:
if diffs_found:
log.status.Print(
"\nAll policies written. After verifying IAM policies, rerun"
" this tool with"
f" --input-iam-policy-dir={output_iam_policy_dir} to complete"
" migration"
)
else:
log.status.Print(
"No IAM changes are needed. Rerun this tool without"
" --output-iam-policy to complete migration"
)
return None
if not WrappedCopyImagesFromGCR(
[host],
to_pkg_dev,
recent_images,
last_uploaded=last_uploaded_versions,
copy_from=from_gcr,
max_threads=args.max_threads,
):
return None
if not copy_only:
log.status.Print(
"\nAny reference to {gcr} will "
"still need to be updated to reference {ar}".format(
gcr=from_gcr, ar=host + "/" + to_pkg_dev
)
)
return None
messages = ar_requests.GetMessages()
if copy_only:
copying_projects = projects
enabled_projects = []
disabled_projects = []
invalid_projects = []
partial_projects = []
else:
redirection_state = GetRedirectionStates(projects)
enabled_projects = []
disabled_projects = []
copying_projects = []
invalid_projects = []
partial_projects = []
for project, state in redirection_state.items():
if (
state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_ENABLED
):
enabled_projects.append(project)
elif (
state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_ENABLED_AND_COPYING
):
copying_projects.append(project)
elif (
state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_DISABLED
):
disabled_projects.append(project)
elif (
state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_PARTIAL_AND_COPYING
):
partial_projects.append(project)
else:
invalid_projects.append(project)
if invalid_projects:
log.status.Print(
"Skipping migration for projects in unsupported state: {}".format(
invalid_projects
)
)
if len(invalid_projects) == len(projects):
sys.exit(1)
# Exit early if all projects are migrated
if (
len(enabled_projects) == len(projects)
and canary_reads != 100
and canary_reads != 0
):
log.status.Print(
"Artifact Registry is already handling all requests for *gcr.io repos"
" for the provided projects. If there are images you still need to"
" copy, use the --copy-only flag."
)
sys.exit(1)
if enabled_projects and canary_reads != 100 and canary_reads != 0:
log.status.Print(
"Skipping already migrated projects: {}\n".format(enabled_projects)
)
# Allow going backwards -> 100% canary reads, which is the safest way to
# revert
# Also allow backwards ->0% canary reads, because it is clear user wants to
# disable redirection
# Disallow other values, because those are probably accidents when grouping
# multiple projects
if canary_reads == 100 or canary_reads == 0:
partial_projects.extend(copying_projects)
copying_projects = []
partial_projects.extend(enabled_projects)
enabled_projects = []
elif canary_reads is not None and copying_projects:
log.status.Print(
f"Skipping projects in final copying: {copying_projects}\n"
"Only --canary-reads=100 (safer) or --canary-reads=0 are"
" allowed for projects with migrated writes.\n",
)
copying_projects = []
# Only do the initial steps for projects where we haven't started redirection
# yet. Otherwise, we pick up where we left off.
if disabled_projects and not input_iam_policy_dir:
if not MaybeCreateMissingRepos(
disabled_projects, automigrate=True, dry_run=False
):
return None
# Re-check list of repos because we tried to create some
# Also get list for copying projects while we're at it, because we'll
# need them later
existing_repos = {}
for project in disabled_projects + copying_projects + partial_projects:
existing_repos[project] = GetExistingRepos(project)
projects_to_redirect = []
dangerous_projects = []
repo_bucket_map = {}
for project in disabled_projects:
if not existing_repos[project]:
log.status.Print(
"Skipping project {} because it has no Artifact Registry repos to"
" migrate to".format(project)
)
continue
# If we're missing any repos, check if they're repos with GCR buckets
missing_bucket_repos = []
repos_with_gcr_buckets = GetGCRRepos(_GCR_BUCKETS, project)
repo_bucket_map[project] = repos_with_gcr_buckets
for g in repos_with_gcr_buckets:
if g not in [r["repository"] for r in existing_repos[project]]:
missing_bucket_repos.append(g)
if missing_bucket_repos:
dangerous_projects.append(project)
else:
projects_to_redirect.append(project)
if projects_to_redirect or partial_projects:
for project in dangerous_projects:
log.status.Print(
"Skipping project {} because it has a Container Registry"
" bucket without a corresponding Artifact Registry"
" repository.".format(project)
)
# If all listed projects are dangerous, this may be intentional. Allow it, but
# warn first
elif dangerous_projects:
c = console_attr.GetConsoleAttr()
cont = console_io.PromptContinue(
"\n{project_str} has Container Registry buckets without"
" corresponding Artifact Registry repositories. Existing Container"
" Registry data will become innacessible.".format(
project_str="This project"
if len(dangerous_projects) == 1
else "Each project"
),
"Do you wish to continue " + c.Colorize("(not recommended)", "red"),
default=False,
)
if not cont:
return None
projects_to_redirect = dangerous_projects
# Pre-copy the image. Don't bother with copy-only because we'll do it later.
# Pre-copy serves two purposes:
# 1) A smoke test such that if something breaks, it breaks BEFORE we redirect
# 2) Gets most of the commonly used images copied ahead of time to avoid
# a load/quota spike at redirection time
if (
not copy_only
and not output_iam_policy_dir
and projects_to_redirect
and not skip_pre_copy
):
pre_copied_projects = []
log.status.Print(
"\nCopying initial images (additional images will be copied later)...\n"
)
for project in projects_to_redirect:
gcr_hosts = [r["repository"] for r in existing_repos[project]]
last_uploaded_for_precopy = 100
if last_uploaded_versions:
last_uploaded_for_precopy = min(last_uploaded_versions,
last_uploaded_for_precopy)
if WrappedCopyImagesFromGCR(
gcr_hosts,
project,
# Reduce down-time by only copying recent images. This is enough to
# address the 2 points above
recent_images=7,
last_uploaded=last_uploaded_for_precopy,
# None of these projects have been redirected yet.
convert_to_pkg_dev=True,
max_threads=args.max_threads,
pre_copy=True,
):
# Don't even try redirecting projects that don't have auth setup
# correctly(b/327496533)
pre_copied_projects.append(project)
projects_to_redirect = pre_copied_projects
if not skip_iam:
if input_iam_policy_dir:
cont = console_io.PromptContinue(
"\nContinuing will update IAM policies for repositories in the"
" following projects based on the files in"
f" {input_iam_policy_dir}:\n{projects_to_redirect}",
default=True,
)
if not cont:
return None
diffs_found = False
needs_removal = []
for project in projects_to_redirect:
try:
project_diffs, continue_checking_auth = SetupAuthForProject(
project,
existing_repos[project],
repo_bucket_map[project],
output_iam_policy_dir=output_iam_policy_dir,
input_iam_policy_dir=input_iam_policy_dir,
use_analyze=use_analyze,
)
except apitools_exceptions.HttpError as e:
needs_removal.append(project)
log.status.Print(
f"Skipping {project} due to error setting policy:"
f" {json.loads(e.content)['error']['message']}"
)
continue
if project_diffs:
diffs_found = True
elif input_iam_policy_dir:
if not os.path.isdir(
os.path.join(files.ExpandHomeDir(input_iam_policy_dir), project)
):
log.status.Print(
f"Skipping {project} because no policy directory found"
)
needs_removal.append(project)
if not continue_checking_auth:
break
for project in needs_removal:
projects_to_redirect.remove(project)
if output_iam_policy_dir:
if diffs_found:
log.status.Print(
"\nAll policies written. After verifying IAM policies, rerun this"
f" tool with --input-iam-policy-dir={output_iam_policy_dir} to"
" complete migration"
)
else:
log.status.Print(
"No IAM changes are needed. Rerun this tool without"
" --output-iam-policy to complete migration"
)
return None
if input_iam_policy_dir and not diffs_found:
log.status.Print(f"No IAM policies found at {input_iam_policy_dir}")
sys.exit(1)
projects_to_redirect.extend(partial_projects)
if canary_reads is not None and projects_to_redirect:
log.status.Print(
f"\nThe next step will redirect {canary_reads}% of *gcr.io read"
" traffic to Artifact Registry. All pushes will still write to"
" Container Registry. While canarying, Artifact Registry will attempt"
" to copy missing images from Container Registry at request time.\n"
)
update = console_io.PromptContinue(
"Projects to redirect: {}".format(projects_to_redirect),
default=True,
)
if not update:
return None
for project in projects_to_redirect:
if SetRedirectionStatus(
project,
messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_PARTIAL_AND_COPYING,
pull_percent=canary_reads,
):
copying_projects.append(project)
log.status.Print(
f"\n{canary_reads}% of *gcr.io read traffic is now being served by"
f" Artifact Registry for {project}. Missing images are copied from"
" Container Registry.\nTo send traffic back to Container Registry,"
" run:\n gcloud artifacts settings disable-upgrade-redirection"
f" --project={project}\nTo send all traffic to Artifact"
" Registry, re-run this script without --canary-reads"
)
return None
if projects_to_redirect:
caveat = ""
if recent_images:
caveat = (
f" that have been pulled or pushed in the last {recent_images} days"
)
log.status.Print(
"\nThe next step will redirect all *gcr.io traffic to"
f" Artifact Registry. Remaining Container Registry images{caveat} will"
" be copied. During migration, Artifact Registry"
" will serve *gcr.io requests for images it doesn't have yet by"
" copying them from Container Registry at request time. Deleting"
" images from *gcr.io repos in the middle of migration might not be"
" effective.\n\n"
"IMPORTANT: Make sure to update any relevant VPC-SC policies before"
" migrating. Once *gcr.io is redirected to Artifact Registry, the"
# gcloud-disable-gdu-domain
" artifactregistry.googleapis.com service will be checked for VPC-SC"
# gcloud-disable-gdu-domain
" instead of containerregistry.googleapis.com.\n"
)
update = console_io.PromptContinue(
"Projects to redirect: {}".format(projects_to_redirect),
default=True,
)
if not update:
return None
for project in projects_to_redirect:
if SetRedirectionStatus(
project,
messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_ENABLED_AND_COPYING,
):
copying_projects.append(project)
rollback_command = (
"*gcr.io traffic is now being served by Artifact Registry for"
f" {project}. Missing images are being copied from Container"
" Registry\nTo send all write traffic back to Container Registry,"
" re-run this command with --canary-reads=100\n"
)
# Don't even mention full rollback if doing a partial migration, because
# it is a footgun. If doing a full migration, give both options.
if not partial_projects:
rollback_command += (
"To send all read and write traffic to "
" Container Registry, instead run:\n"
" gcloud artifacts settings disable-upgrade-redirection"
f" --project={project}\n"
)
log.status.Print(rollback_command)
if not copying_projects:
return None
if copy_only:
log.status.Print("\nCopying images...\n")
else:
log.status.Print("\nCopying remaining images...\n")
# Note that we're already copying automatically at this point. This step
# just makes sure we've copied all the remaining images before we turn off
# copying. This could take a while for large repos.
failed_copies = []
to_enable = []
unredirected_copying_projects = set()
if copy_only:
for project, state in GetRedirectionStates(projects).items():
if (
state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_DISABLED
or state
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_PARTIAL_AND_COPYING
):
unredirected_copying_projects.add(project)
for project in copying_projects:
gcr_hosts = [r["host"] for r in existing_repos[project]]
# If a project is unredirected, we can't send the request through the
# gcr.io endpoint and need to convert to the pkg.dev url
convert_to_pkg_dev = project in unredirected_copying_projects
if convert_to_pkg_dev:
# Since we're not using the hosts directly, always use the repository in
# case the host is overriden
gcr_hosts = [r["repository"] for r in existing_repos[project]]
if WrappedCopyImagesFromGCR(
gcr_hosts,
project,
recent_images,
last_uploaded=last_uploaded_versions,
convert_to_pkg_dev=convert_to_pkg_dev,
max_threads=args.max_threads,
):
to_enable.append(project)
else:
failed_copies.append(project)
if copy_only:
return None
if failed_copies:
if to_enable:
log.status.Print("\nOnly completing migration for successful projects")
else:
cont = console_io.PromptContinue(
"\nAll projects had image copy failures. Continuing will disable"
" further copying and images will be missing.",
"Continue anyway?",
default=False,
)
if cont:
to_enable = failed_copies
if not cont:
return None
log.status.Print()
for project in to_enable:
if SetRedirectionStatus(
project,
messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_ENABLED,
):
log.status.Print(
"*gcr.io traffic is now being fully served by Artifact Registry for"
" {project}. Images will no longer be copied from Container Registry"
" for this project.".format(project=project)
)
enabled_projects.append(project)
log.status.Print(
"\nThe following projects are fully migrated: {}".format(enabled_projects)
)
remaining_projects = list(set(projects) - set(enabled_projects))
if remaining_projects:
log.status.Print(
"The following projects still need to finish being migrated: {}".format(
remaining_projects
)
)
log.status.Print(
"\nThis script can be re-run to migrate any projects that haven't "
"finished."
)
def WrappedCopyImagesFromGCR(
hosts,
project_repo,
recent_images,
last_uploaded,
copy_from="same",
convert_to_pkg_dev=False,
max_threads=8,
pre_copy=False,
):
"""Copies images from GCR for all hosts and handles auth error."""
original_project_repo = project_repo
project_repo = project_repo.replace(":", "/")
try:
results = collections.defaultdict(int)
if copy_from == "same":
if len(hosts) == 1:
message = f"Copying images for {hosts[0]}/{project_repo}... "
else:
message = f"Copying images for {project_repo}... "
else:
if len(hosts) == 1:
message = f"Copying images to {hosts[0]}/{project_repo}... "
else:
message = f"Copying images to {project_repo}... "
# TODO: b/325516793 - Uncomment once we can get test coverage
# def PrintResults():
# nonlocal results
# message = (
# f"({results['tagsCopied']} tags copied,"
# f" {results['manifestsCopied']} manifests copied,"
# f" {results['tagsFailed'] + results['manifestsFailed']} failures) "
# )
# if results["new_failure"]:
# message += f"Example failure: {results['new_failure']} "
# return message
with progress_tracker.ProgressTracker(
message,
tick_delay=2,
no_spacing=True,
):
with futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
thread_futures = []
for host in sorted(hosts):
if convert_to_pkg_dev:
endpoint_prefix = (
properties.VALUES.artifacts.registry_endpoint_prefix.Get()
)
location = _ALLOWED_GCR_REPO_LOCATION[host]
url = f"{endpoint_prefix}{location}-docker.pkg.dev/{project_repo}/{host}"
else:
url = f"{host}/{project_repo}"
copy_args = [
thread_futures,
executor if max_threads > 1 else None,
url,
recent_images,
last_uploaded,
copy_from,
results,
]
if max_threads > 1:
thread_futures.append(
executor.submit(CopyImagesFromGCR, *copy_args)
)
else:
CopyImagesFromGCR(*copy_args)
while thread_futures:
future = thread_futures.pop()
future.result()
log.status.Print(
"\n{project}: Successfully copied {tags} additional tags and"
" {manifests} additional manifests. There were {failures} failures."
.format(
project=project_repo,
tags=results["tagsCopied"],
manifests=results["manifestsCopied"],
failures=results["tagsFailed"] + results["manifestsFailed"],
)
)
if results["tagsFailed"] + results["manifestsFailed"]:
log.status.Print("\nExample images that failed to copy:")
for example_failure in results["example_failures"]:
log.status.Print(example_failure)
# Some errors are okay when pre-copying. We'll just try again later
# Print out the GCR data loss failures if there's any.
if results["manifestsFailedWithNotFound"] > 0:
log.status.Print(
"\nAmong those failures, there are {not_found} image copy"
" failures due to parts of the image missing from GCR."
" You may try pulling the images directly from GCR to confirm."
" Because the images are already currupted in GCR, there's no"
" action required for these images.".format(
not_found=results["manifestsFailedWithNotFound"],
),
)
log.status.Print(
"\nExample images that failed to copy due to missing data in GCR:"
)
for example_not_found in results["not_found_failures"]:
log.status.Print(example_not_found)
return pre_copy
return True
except docker_http.V2DiagnosticException as e:
match = re.search("requires (.*) to have storage.objects.", str(e))
if not match:
raise
con = console_attr.GetConsoleAttr()
project = original_project_repo
if copy_from != "same":
project = copy_from.split("/")[-1]
log.status.Print(
con.Colorize("\nERROR:", "red")
+ " The Artifact Registry service account doesn't have access to"
f" {project} for copying images\nThe following command will grant"
" the necessary access (may take a few minutes):\n gcloud projects"
" add-iam-policy-binding"
f" {project} --member='serviceAccount:{match[1]}'"
" --role='roles/storage.objectViewer'\nYou can re-run this script"
" after granting access."
)
return False
def CopyImagesFromGCR(
thread_futures,
executor,
repo_path,
recent_images,
last_uploaded,
copy_from,
results,
):
"""Recursively copies images from GCR."""
http_obj = util.Http(timeout=10 * 60)
repository = docker_name.Repository(repo_path)
next_page = ""
backoff = 5
while True:
try:
with docker_image.FromRegistry(
basic_creds=util.CredentialProvider(),
name=repository,
transport=http_obj,
) as image:
query = f"?CopyFromGCR={copy_from}"
if recent_images:
query += f"&PullDays={recent_images}"
if last_uploaded:
query += f"&MaxVersions={last_uploaded}"
if next_page:
query += f"&NextPage={next_page}"
tags_payload = json.loads(
# pylint:disable-next=protected-access
image._content(f"tags/list{query}").decode("utf8")
)
if tags_payload.get("nextPage"):
next_page = tags_payload["nextPage"]
else:
break
except requests.exceptions.ReadTimeout:
continue
except docker_http.V2DiagnosticException as e:
# Gateway Timeout
if e.status == 504:
continue
# Too Many Requests
if e.status == 429:
# All requests will likely hit quota at ~same time, so randomize backoff
# to spread them out
if backoff < 100:
backoff += random.randrange(1, 25)
time.sleep(backoff)
continue
raise
results["manifestsCopied"] += tags_payload.get("manifestsCopied", 0)
results["tagsCopied"] += tags_payload.get("tagsCopied", 0)
results["manifestsFailed"] += tags_payload.get("manifestsFailed", 0)
results["manifestsFailedWithNotFound"] += tags_payload.get(
"manifestsFailedWithNotFound", 0
)
results["tagsFailed"] += tags_payload.get("tagsFailed", 0)
failures = tags_payload.get("exampleFailures", [])
if failures:
if not results["example_failures"]:
results["example_failures"] = []
results["example_failures"] = (results["example_failures"] + failures)[0:10]
not_found_failures = tags_payload.get("exampleFailuresWithNotFound", [])
if not_found_failures:
if not results["not_found_failures"]:
results["not_found_failures"] = []
results["not_found_failures"] = (
results["not_found_failures"] + not_found_failures
)[0:10]
for child in tags_payload["child"]:
copy_args = [
thread_futures,
executor,
repo_path + "/" + child,
recent_images,
last_uploaded,
copy_from,
results,
]
if executor:
thread_futures.append(executor.submit(CopyImagesFromGCR, *copy_args))
else:
CopyImagesFromGCR(*copy_args)
return results
# Returns if we should continue with migration
def MaybeCreateMissingRepos(projects, automigrate, dry_run):
"""Creates missing repos if needed and requested by the user."""
if len(projects) == 1:
missing_repos = {projects[0]: GetRedirectionEnablementReport(projects[0])}
else:
missing_repos = GetMultiProjectRedirectionEnablementReport(projects)
if dry_run:
log.status.Print("Dry run enabled, no changes made.")
return False
num_missing_repos = sum(len(r) for r in missing_repos.values())
if num_missing_repos:
con = console_attr.GetConsoleAttr()
s = ("s" if num_missing_repos > 1 else "")
create_repos = console_io.PromptContinue(
f"\ngcloud can automatically create the {num_missing_repos} missing"
f" repo{s} in Artifact Registry.\nIf you would like to setup CMEK for"
" these repos, exit now and create them manually instead.",
"Create missing repos " + con.Colorize("(recommended)", "green"),
default=automigrate,
)
if not create_repos:
return True
for project, repos in missing_repos.items():
CreateRepositories(project, repos)
else:
con = console_attr.GetConsoleAttr()
log.status.Print(
con.Colorize("OK: ", "green")
+ "All Container Registry repositories have equivalent Artifact"
" Registry "
"repostories.\n"
)
return True
def CreatePkgDevIfMissing(host, location, project, repo):
"""Create a pkg.dev repository if it doesn't exist.
Args:
host: AR hostname (string)
location: repo location (string)
project: project id of the repo (string)
repo: repo_id to be created (string)
"""
try:
ar_requests.GetRepository(
f"projects/{project}/locations/{location}/repositories/{repo}"
)
except apitools_exceptions.HttpNotFoundError:
con = console_attr.GetConsoleAttr()
console_io.PromptContinue(
con.Colorize(
f"\nNo repository found at {host}/{project}/{repo}", "yellow"
),
"Create missing repository?",
default=True,
cancel_on_no=True,
)
CreateRepositories(project, [{"location": location, "repository": repo}])
def CreateRepositories(project, repos):
"""Creates repositories in Artifact Registry."""
messages = ar_requests.GetMessages()
op_resources = []
for repo in repos:
repository_message = messages.Repository(
name="projects/{}/locations/{}/repositories/{}".format(
project, repo["location"], repo["repository"]
),
description="Created by gcloud",
format=messages.Repository.FormatValueValuesEnum.DOCKER,
)
try:
op = ar_requests.CreateRepository(
project, repo["location"], repository_message
)
op_resources.append(
resources.REGISTRY.ParseRelativeName(
op.name,
collection="artifactregistry.projects.locations.operations",
)
)
except apitools_exceptions.HttpForbiddenError as e:
log.status.Print(
f"Failed to create repository {repo['location']}:"
f" {json.loads(e.content)['error']['message']}\n"
)
LogUserPermissionDeniedError(project)
except apitools_exceptions.HttpError as e:
log.status.Print(
f"Failed to create repository {repo['location']}:"
f" {json.loads(e.content)['error']['message']}\n"
)
client = ar_requests.GetClient()
for resource in op_resources:
waiter.WaitFor(
waiter.CloudOperationPollerNoResources(
client.projects_locations_operations
),
resource,
message="Waiting for repo creation to complete...",
)
def EnableUpgradeRedirection(unused_ref, args):
"""Enables upgrade redirection for the active project."""
project = GetProject(args)
dry_run = args.dry_run
log.status.Print("Performing redirection enablement checks...\n")
messages = ar_requests.GetMessages()
settings = ar_requests.GetProjectSettings(project)
current_status = settings.legacyRedirectionState
if (
current_status
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_ENABLED
or current_status == messages.ProjectSettings
):
log.status.Print(
f"Project {project} is already using Artifact Registry for all *gcr.io"
" traffic."
)
elif (
current_status
== messages.ProjectSettings.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_FINALIZED
):
log.status.Print(
f"Redirection is already enabled (and finalized) for project {project}."
)
return None
if not MaybeCreateMissingRepos([project], False, dry_run):
return None
con = console_attr.GetConsoleAttr()
update = console_io.PromptContinue(
"\nThis action will redirect all Container Registry traffic to Artifact "
+ f"Registry for project {project}."
+ con.Colorize(
" Your existing images and IAM policies will NOT be copied.\n", "red"
)
+ "To preserve existing GCR behavior, consider running `gcloud artifacts"
f" docker upgrade migrate --project={project}` instead.",
default=True,
)
if not update:
log.status.Print("No changes made.")
return None
return ar_requests.EnableUpgradeRedirection(GetProject(args))
def DisableUpgradeRedirection(unused_ref, args):
"""Disables upgrade redirection for the active project."""
project = GetProject(args)
messages = ar_requests.GetMessages()
con = console_attr.GetConsoleAttr()
log.status.Print("Disabling upgrade redirection...\n")
# If the current state is finalized, then disabling is not possible
log.status.Print("Checking current redirection status...\n")
settings = ar_requests.GetProjectSettings(GetProject(args))
current_status = settings.legacyRedirectionState
if (current_status == messages.ProjectSettings
.LegacyRedirectionStateValueValuesEnum.REDIRECTION_FROM_GCR_IO_FINALIZED):
log.status.Print(
con.Colorize("FAIL:", "red") + " Redirection has already "
"been finalized for project {}. Disabling redirection is not possible "
"once it has been finalized.".format(project))
return None
update = console_io.PromptContinue(
"This action will disable the redirection of Container Registry traffic "
f"to Artifact Registry for project {project}\n\n"
+ con.Colorize("WARNING:", "red")
+ " This will disable redirection for both read and write traffic to"
f" Artifact Registry for project {project} and you may lose access to"
" images pushed to Artifact Registry. To disable redirection for write"
" traffic only, run:\n gcloud artifacts docker upgrade migrate"
f" --project={project} --canary-reads=100",
default=True,
)
if not update:
log.status.Print("No changes made.")
return None
return ar_requests.DisableUpgradeRedirection(project)
def SanitizeRemoteRepositoryConfig(unused_ref, args, request):
"""Make sure that only one remote source is set at the same time."""
if args.remote_mvn_repo:
request.repository.remoteRepositoryConfig.dockerRepository = None
request.repository.remoteRepositoryConfig.npmRepository = None
request.repository.remoteRepositoryConfig.pythonRepository = None
request.repository.remoteRepositoryConfig.aptRepository = None
request.repository.remoteRepositoryConfig.yumRepository = None
elif args.remote_docker_repo:
request.repository.remoteRepositoryConfig.mavenRepository = None
request.repository.remoteRepositoryConfig.npmRepository = None
request.repository.remoteRepositoryConfig.pythonRepository = None
request.repository.remoteRepositoryConfig.aptRepository = None
request.repository.remoteRepositoryConfig.yumRepository = None
elif args.remote_npm_repo:
request.repository.remoteRepositoryConfig.dockerRepository = None
request.repository.remoteRepositoryConfig.mavenRepository = None
request.repository.remoteRepositoryConfig.pythonRepository = None
request.repository.remoteRepositoryConfig.aptRepository = None
request.repository.remoteRepositoryConfig.yumRepository = None
elif args.remote_python_repo:
request.repository.remoteRepositoryConfig.dockerRepository = None
request.repository.remoteRepositoryConfig.npmRepository = None
request.repository.remoteRepositoryConfig.mavenRepository = None
request.repository.remoteRepositoryConfig.aptRepository = None
request.repository.remoteRepositoryConfig.yumRepository = None
elif args.remote_apt_repo:
request.repository.remoteRepositoryConfig.dockerRepository = None
request.repository.remoteRepositoryConfig.mavenRepository = None
request.repository.remoteRepositoryConfig.npmRepository = None
request.repository.remoteRepositoryConfig.pythonRepository = None
request.repository.remoteRepositoryConfig.yumRepository = None
elif args.remote_yum_repo:
request.repository.remoteRepositoryConfig.dockerRepository = None
request.repository.remoteRepositoryConfig.mavenRepository = None
request.repository.remoteRepositoryConfig.npmRepository = None
request.repository.remoteRepositoryConfig.pythonRepository = None
request.repository.remoteRepositoryConfig.aptRepository = None
return request
def GetMimetype(path):
mime_type, _ = mimetypes.guess_type(path)
return mime_type or "application/octet-stream"