File: //snap/google-cloud-cli/current/lib/surface/artifacts/docker/images/scan.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Scan a container image using the On-Demand Scanning API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import json
from googlecloudsdk.api_lib.ondemandscanning import util as api_util
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.artifacts import flags
from googlecloudsdk.command_lib.artifacts import ondemandscanning_util as ods_util
from googlecloudsdk.command_lib.util.anthos import binary_operations
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.updater import local_state
from googlecloudsdk.core.updater import update_manager
from googlecloudsdk.core.util import platforms
import six
# Extract stage messages to constants for convenience.
SCAN_MESSAGE = 'Scanning container image'
EXTRACT_MESSAGE = ('Locally extracting packages and versions from {} '
'container image')
RPC_MESSAGE = 'Remotely initiating analysis of packages and versions'
POLL_MESSAGE = 'Waiting for analysis operation to complete'
# Error messages used to fill in for unknown error cases.
EXTRACTION_KILLED_ERROR_TEMPLATE = (
'Extraction failed: image extraction was either stopped or crashed '
'(possibly due to a lack of available memory) with exit code '
'{exit_code}')
UNKNOWN_EXTRACTION_ERROR_TEMPLATE = (
'Extraction failed: unknown error (exit code: {exit_code})')
@base.DefaultUniverseOnly
@base.ReleaseTracks(base.ReleaseTrack.BETA)
class ScanBeta(base.Command):
"""Perform a vulnerability scan on a container image.
You can scan a container image in a Google Cloud registry (Artifact Registry
or Container Registry), or a local container image.
Reference an image by tag or digest using any of the formats:
Artifact Registry:
LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY-ID/IMAGE[:tag]
LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY-ID/IMAGE@sha256:digest
Container Registry:
[LOCATION.]gcr.io/PROJECT-ID/REPOSITORY-ID/IMAGE[:tag]
[LOCATION.]gcr.io/PROJECT-ID/REPOSITORY-ID/IMAGE@sha256:digest
Local:
IMAGE[:tag]
"""
detailed_help = {
'DESCRIPTION':
'{description}',
'EXAMPLES':
"""\
Start a scan of a container image stored in Artifact Registry:
$ {command} us-west1-docker.pkg.dev/my-project/my-repository/busy-box@sha256:abcxyz --remote
Start a scan of a container image stored in the Container Registry, and perform the analysis in Europe:
$ {command} eu.gcr.io/my-project/my-repository/my-image:latest --remote --location=europe
Start a scan of a container image stored locally, and perform the analysis in Asia:
$ {command} ubuntu:latest --location=asia
"""
}
@staticmethod
def Args(parser):
flags.GetResourceURIArg().AddToParser(parser)
flags.GetRemoteFlag().AddToParser(parser)
flags.GetOnDemandScanningFakeExtractionFlag().AddToParser(parser)
flags.GetOnDemandScanningLocationFlag().AddToParser(parser)
flags.GetAdditionalPackageTypesFlag().AddToParser(parser)
flags.GetExperimentalPackageTypesFlag().AddToParser(parser)
flags.GetSkipPackageTypesFlag().AddToParser(parser)
flags.GetVerboseErrorsFlag().AddToParser(parser)
base.ASYNC_FLAG.AddToParser(parser)
def Run(self, args):
"""Runs local extraction then calls ODS with the results.
Args:
args: an argparse namespace. All the arguments that were provided to this
command invocation.
Returns:
AnalyzePackages operation.
Raises:
UnsupportedOS: when the command is run on a Windows machine.
"""
if platforms.OperatingSystem.IsWindows():
raise ods_util.UnsupportedOS(
'On-Demand Scanning is not supported on Windows')
# Verify that the local-extract component is installed, and prompt the user
# to install it if it's not.
try:
# If the user has access to the gcloud components manager, this will
# prompt the user to install it. If they do not have access, it will
# instead print the command to install it using a package manager.
update_manager.UpdateManager.EnsureInstalledAndRestart(['local-extract'])
except update_manager.MissingRequiredComponentsError:
# Two possibilities with this error:
# 1. The user has access to the gcloud components manager but decided
# against intalling it.
# 2. The user does not have access to the gcloud components manager. A
# message was printed to the user with the command to install the
# component using their package manager (e.g. apt-get).
raise
except local_state.InvalidSDKRootError:
# This happens when gcloud is run locally, but not when distributed.
pass
# Construct the object which invokes the `local-extract` component. This
# might still fail if the binary is run locally.
cmd = Command()
# TODO(b/173619679): Validate RESOURCE_URI argument.
# Dynamically construct the stages based on the --async flag; when
# --async=true, we do not need a separate poll stage.
stages = [
progress_tracker.Stage(
EXTRACT_MESSAGE.format('remote' if args.remote else 'local'),
key='extract'),
progress_tracker.Stage(RPC_MESSAGE, key='rpc')
]
if not args.async_:
stages += [progress_tracker.Stage(POLL_MESSAGE, key='poll')]
messages = self.GetMessages()
with progress_tracker.StagedProgressTracker(
SCAN_MESSAGE, stages=stages) as tracker:
# Stage 1) Extract.
tracker.StartStage('extract')
operation_result = cmd(
resource_uri=args.RESOURCE_URI,
remote=args.remote,
fake_extraction=args.fake_extraction,
additional_package_types=args.additional_package_types,
experimental_package_types=args.experimental_package_types,
skip_package_types=args.skip_package_types,
verbose_errors=args.verbose_errors,
)
if operation_result.exit_code:
# Filter out any log messages on std err and only include any actual
# extraction errors.
extraction_error = None
if operation_result.stderr:
extraction_error = '\n'.join([
line for line in operation_result.stderr.splitlines()
if line.startswith('Extraction failed')
])
if not extraction_error:
if operation_result.exit_code < 0:
extraction_error = EXTRACTION_KILLED_ERROR_TEMPLATE.format(
exit_code=operation_result.exit_code,)
else:
extraction_error = UNKNOWN_EXTRACTION_ERROR_TEMPLATE.format(
exit_code=operation_result.exit_code,)
tracker.FailStage('extract',
ods_util.ExtractionFailedError(extraction_error))
return
# Parse stdout for the JSON-ified PackageData protos.
pkgs = []
for pkg in json.loads(operation_result.stdout):
pkg_data = messages.PackageData(
package=pkg['package'],
version=pkg['version'],
cpeUri=pkg['cpe_uri'],
)
if 'package_type' in pkg:
pkg_data.packageType = arg_utils.ChoiceToEnum(
pkg['package_type'],
messages.PackageData.PackageTypeValueValuesEnum)
if 'hash_digest' in pkg:
pkg_data.hashDigest = pkg['hash_digest']
pkgs += [pkg_data]
tracker.CompleteStage('extract')
# Stage 2) Make the RPC to the On-Demand Scanning API.
tracker.StartStage('rpc')
op = self.AnalyzePackages(args, pkgs)
tracker.CompleteStage('rpc')
# Stage 3) Poll the operation if requested.
response = None
if not args.async_:
tracker.StartStage('poll')
tracker.UpdateStage('poll', '[{}]'.format(op.name))
response = self.WaitForOperation(op)
tracker.CompleteStage('poll')
if args.async_:
log.status.Print('Check operation [{}] for status.'.format(op.name))
return op
return response
def AnalyzePackages(self, args, pkgs):
return api_util.AnalyzePackagesBeta(
properties.VALUES.core.project.Get(required=True),
args.location,
args.RESOURCE_URI,
pkgs)
def GetMessages(self):
return api_util.GetMessages('v1beta1')
def WaitForOperation(self, op):
return ods_util.WaitForOperation(op, 'v1beta1')
@base.ReleaseTracks(base.ReleaseTrack.GA)
class ScanGA(ScanBeta):
"""Perform a vulnerability scan on a container image.
You can scan a container image in a Google Cloud registry (Artifact Registry
or Container Registry), or a local container image.
Reference an image by tag or digest using any of the formats:
Artifact Registry:
LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY-ID/IMAGE[:tag]
LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY-ID/IMAGE@sha256:digest
Container Registry:
[LOCATION.]gcr.io/PROJECT-ID/REPOSITORY-ID/IMAGE[:tag]
[LOCATION.]gcr.io/PROJECT-ID/REPOSITORY-ID/IMAGE@sha256:digest
Local:
IMAGE[:tag]
"""
def AnalyzePackages(self, args, pkgs):
return api_util.AnalyzePackagesGA(
properties.VALUES.core.project.Get(required=True),
args.location,
args.RESOURCE_URI,
pkgs)
def GetMessages(self):
return api_util.GetMessages('v1')
def WaitForOperation(self, op):
return ods_util.WaitForOperation(op, 'v1')
class Command(binary_operations.BinaryBackedOperation):
"""Wrapper for call to the Go binary."""
def __init__(self, **kwargs):
super(Command, self).__init__(binary='local-extract', **kwargs)
def _ParseArgsForCommand(
self,
resource_uri,
remote,
fake_extraction,
additional_package_types,
experimental_package_types,
skip_package_types,
verbose_errors,
**kwargs
):
args = [
'--resource_uri=' + resource_uri,
'--remote=' + six.text_type(remote),
'--provide_fake_results=' + six.text_type(fake_extraction),
# Due to backwards compatibility issues between the gcloud command and
# the local-extract binary, provide a list of all flags to --undefok
# which were introduced after the first launch. In this way, new
# versions of the command can invoke old versions of the binary.
'--undefok='
+ ','.join([
'additional_package_types',
'skip_package_types',
'verbose_errors',
'use_scalibr',
]),
]
package_types = []
if additional_package_types:
package_types += additional_package_types
if experimental_package_types:
package_types += experimental_package_types
if package_types:
args.append('--additional_package_types=' +
six.text_type(','.join(package_types)))
if skip_package_types:
args.append(
'--skip_package_types=' + six.text_type(','.join(skip_package_types))
)
if verbose_errors:
args.append('--verbose_errors=' + six.text_type(verbose_errors))
args.append('--use_scalibr')
return args