HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/third_party/containerregistry/client/v1/docker_image_.py
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This package provides DockerImage for examining docker_build outputs."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import gzip
import io
import json
import os
import string
import subprocess
import sys
import tarfile
import tempfile
import threading

from containerregistry.client import docker_creds
from containerregistry.client import docker_name
from containerregistry.client.v1 import docker_creds as v1_creds
from containerregistry.client.v1 import docker_http

import httplib2

import six
from six.moves import range  # pylint: disable=redefined-builtin
import six.moves.http_client


class DockerImage(six.with_metaclass(abc.ABCMeta, object)):
  """Interface for implementations that interact with Docker images."""

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def top(self):
    """The layer id of the topmost layer."""
  # pytype: enable=bad-return-type

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def repositories(self):
    """The json blob of tags, loaded as a dict."""
    pass
  # pytype: enable=bad-return-type

  def parent(self, layer_id):
    """The layer of id of the parent of the provided layer, or None.

    Args:
      layer_id: the id of the layer whose parentage we're asking

    Returns:
      The identity of the parent layer, or None if the root.
    """
    metadata = json.loads(self.json(layer_id))
    if 'parent' not in metadata:
      return None
    return metadata['parent']

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def json(self, layer_id):
    """The JSON metadata of the provided layer.

    Args:
      layer_id: the id of the layer whose metadata we're asking

    Returns:
      The raw json string of the layer.
    """
    pass
  # pytype: enable=bad-return-type

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def layer(self, layer_id):
    """The layer.tar.gz blob of the provided layer id.

    Args:
      layer_id: the id of the layer for whose layer blob we're asking

    Returns:
      The raw blob string of the layer.
    """
    pass
  # pytype: enable=bad-return-type

  def uncompressed_layer(self, layer_id):
    """Same as layer() but uncompressed."""
    zipped = self.layer(layer_id)
    buf = io.BytesIO(zipped)
    f = gzip.GzipFile(mode='rb', fileobj=buf)
    unzipped = f.read()
    return unzipped

  def diff_id(self, digest):
    """diff_id only exist in schema v22."""
    return None

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def ancestry(self, layer_id):
    """The ancestry of the given layer, base layer first.

    Args:
      layer_id: the id of the layer whose ancestry we're asking

    Returns:
      The list of ancestor IDs, base first, layer_id last.
    """
    pass
  # pytype: enable=bad-return-type

  # __enter__ and __exit__ allow use as a context manager.
  @abc.abstractmethod
  def __enter__(self):
    pass

  @abc.abstractmethod
  def __exit__(self, unused_type, unused_value, unused_traceback):
    pass


# Gzip injects a timestamp into its output, which makes its output and digest
# non-deterministic.  To get reproducible pushes, freeze time.
# This approach is based on the following StackOverflow answer:
# http://stackoverflow.com/
#    questions/264224/setting-the-gzip-timestamp-from-python
class _FakeTime(object):

  def time(self):
    return 1225856967.109


gzip.time = _FakeTime()


class FromShardedTarball(DockerImage):
  """This decodes the sharded image tarballs from docker_build."""

  def __init__(self,
               layer_to_tarball,
               top,
               name = None,
               compresslevel = 9):
    self._layer_to_tarball = layer_to_tarball
    self._top = top
    self._compresslevel = compresslevel
    self._memoize = {}
    self._lock = threading.Lock()
    self._name = name

  def _content(self, layer_id, name, memoize = True):
    """Fetches a particular path's contents from the tarball."""
    # Check our cache
    if memoize:
      with self._lock:
        if name in self._memoize:
          return self._memoize[name]

    # tarfile is inherently single-threaded:
    # https://mail.python.org/pipermail/python-bugs-list/2015-March/265999.html
    # so instead of locking, just open the tarfile for each file
    # we want to read.
    with tarfile.open(name=self._layer_to_tarball(layer_id), mode='r:') as tar:
      try:
        content = tar.extractfile(name).read()  # pytype: disable=attribute-error
      except KeyError:
        content = tar.extractfile('./' + name).read()  # pytype: disable=attribute-error

      # Populate our cache.
      if memoize:
        with self._lock:
          self._memoize[name] = content
      return content

  def top(self):
    """Override."""
    return self._top

  def repositories(self):
    """Override."""
    return json.loads(self._content(self.top(), 'repositories').decode('utf8'))

  def json(self, layer_id):
    """Override."""
    return self._content(layer_id, layer_id + '/json').decode('utf8')

  # Large, do not memoize.
  def uncompressed_layer(self, layer_id):
    """Override."""
    return self._content(layer_id, layer_id + '/layer.tar', memoize=False)

  # Large, do not memoize.
  def layer(self, layer_id):
    """Override."""
    unzipped = self.uncompressed_layer(layer_id)
    buf = io.BytesIO()
    f = gzip.GzipFile(mode='wb', compresslevel=self._compresslevel, fileobj=buf)
    try:
      f.write(unzipped)
    finally:
      f.close()

    zipped = buf.getvalue()
    return zipped

  def ancestry(self, layer_id):
    """Override."""
    p = self.parent(layer_id)
    if not p:
      return [layer_id]
    return [layer_id] + self.ancestry(p)

  # __enter__ and __exit__ allow use as a context manager.
  def __enter__(self):
    return self

  def __exit__(self, unused_type, unused_value, unused_traceback):
    pass


def _get_top(tarball, name = None):
  """Get the topmost layer in the image tarball."""
  with tarfile.open(name=tarball, mode='r:') as tar:
    reps = tar.extractfile('repositories') or tar.extractfile('./repositories')
    if reps is None:
      raise ValueError('Tarball must contain a repositories file')
    repositories = json.loads(reps.read().decode('utf8'))

  if name:
    key = str(name.as_repository())
    return repositories[key][name.tag]

  if len(repositories) != 1:
    raise ValueError('Tarball must contain a single repository, '
                     'or a name must be specified to FromTarball.')

  for (unused_repo, tags) in six.iteritems(repositories):
    if len(tags) != 1:
      raise ValueError('Tarball must contain a single tag, '
                       'or a name must be specified to FromTarball.')
    for (unused_tag, layer_id) in six.iteritems(tags):
      return layer_id

  raise Exception('Unreachable code in _get_top()')


class FromTarball(FromShardedTarball):
  """This decodes the image tarball output of docker_build for upload."""

  def __init__(self,
               tarball,
               name = None,
               compresslevel = 9):
    super(FromTarball, self).__init__(
        lambda unused_id: tarball,
        _get_top(tarball, name),
        name=name,
        compresslevel=compresslevel)


class FromRegistry(DockerImage):
  """This accesses a docker image hosted on a registry (non-local)."""

  def __init__(
      self,
      name,
      basic_creds,
      transport):
    self._name = name
    self._creds = basic_creds
    self._transport = transport
    # Set up in __enter__
    self._tags = {}
    self._response = {}

  def top(self):
    """Override."""
    assert isinstance(self._name, docker_name.Tag)
    return self._tags[self._name.tag]

  def repositories(self):
    """Override."""
    return {self._name.repository: self._tags}

  def tags(self):
    """Lists the tags present in the remote repository."""
    return list(self.raw_tags().keys())

  def raw_tags(self):
    """Dictionary of tag to image id."""
    return self._tags

  def _content(self, suffix):
    if suffix not in self._response:
      _, self._response[suffix] = docker_http.Request(
          self._transport, '{scheme}://{endpoint}/v1/images/{suffix}'.format(
              scheme=docker_http.Scheme(self._endpoint),
              endpoint=self._endpoint,
              suffix=suffix), self._creds, [six.moves.http_client.OK])
    return self._response[suffix]

  def json(self, layer_id):
    """Override."""
    # GET server1/v1/images/IMAGEID/json
    return self._content(layer_id + '/json').decode('utf8')

  # Large, do not memoize.
  def layer(self, layer_id):
    """Override."""
    # GET server1/v1/images/IMAGEID/layer
    return self._content(layer_id + '/layer')

  def ancestry(self, layer_id):
    """Override."""
    # GET server1/v1/images/IMAGEID/ancestry
    return json.loads(self._content(layer_id + '/ancestry').decode('utf8'))

  # __enter__ and __exit__ allow use as a context manager.
  def __enter__(self):
    # This initiates the pull by issuing:
    #   GET H:P/v1/repositories/R/images
    resp, unused_content = docker_http.Request(
        self._transport,
        '{scheme}://{registry}/v1/repositories/{repository_name}/images'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            repository_name=self._name.repository), self._creds,
        [six.moves.http_client.OK])

    # The response should have an X-Docker-Token header, which
    # we should extract and annotate subsequent requests with:
    #   Authorization: Token {extracted value}
    self._creds = v1_creds.Token(resp['x-docker-token'])

    self._endpoint = resp['x-docker-endpoints']
    # TODO(user): Consider also supporting cookies, which are
    # used by Quay.io for authenticated sessions.

    # Next, fetch the set of tags in this repository.
    #   GET server1/v1/repositories/R/tags
    resp, content = docker_http.Request(
        self._transport,
        '{scheme}://{endpoint}/v1/repositories/{repository_name}/tags'.format(
            scheme=docker_http.Scheme(self._endpoint),
            endpoint=self._endpoint,
            repository_name=self._name.repository), self._creds,
        [six.moves.http_client.OK])

    self._tags = json.loads(content.decode('utf8'))
    return self

  def __exit__(self, unused_type, unused_value, unused_traceback):
    pass


class Random(DockerImage):
  """This generates an image with Random properties.

  We ensure basic consistency of the generated docker
  image.
  """

  # TODO(b/36589467): Add function arg for creating blob.
  def __init__(self,
               sample,
               num_layers = 5,
               layer_byte_size = 64,
               blobs = None):
    # Generate the image.
    self._ancestry = []
    self._layers = {}

    num_layers = len(blobs) if blobs else num_layers
    for i in range(num_layers):
      # Avoid repetitions.
      while True:
        layer_id = self._next_id(sample)
        if layer_id not in self._ancestry:
          self._ancestry += [layer_id]
          blob = blobs[i] if blobs else None
          self._layers[layer_id] = self._next_layer(
              sample, layer_byte_size, blob)
          break

  def top(self):
    """Override."""
    return self._ancestry[0]

  def repositories(self):
    """Override."""
    return {'random/image': {'latest': self.top(),}}

  def json(self, layer_id):
    """Override."""
    metadata = {'id': layer_id}

    ancestry = self.ancestry(layer_id)
    if len(ancestry) != 1:
      metadata['parent'] = ancestry[1]

    return json.dumps(metadata, sort_keys=True)

  def layer(self, layer_id):
    """Override."""
    return self._layers[layer_id]

  def ancestry(self, layer_id):
    """Override."""
    assert layer_id in self._ancestry
    index = self._ancestry.index(layer_id)
    return self._ancestry[index:]

  def _next_id(self, sample):
    return sample(b'0123456789abcdef', 64).decode('utf8')

  # pylint: disable=missing-docstring
  def _next_layer(self, sample,
                  layer_byte_size, blob):
    buf = io.BytesIO()

    # TODO(user): Consider doing something more creative...
    with tarfile.open(fileobj=buf, mode='w:gz') as tar:
      if blob:
        info = tarfile.TarInfo(name='./'+self._next_id(sample))
        info.size = len(blob)
        tar.addfile(info, fileobj=io.BytesIO(blob))
      # Linux optimization, use dd for data file creation.
      elif sys.platform.startswith('linux') and layer_byte_size >= 1024 * 1024:
        mb = layer_byte_size / (1024 * 1024)
        tempdir = tempfile.mkdtemp()
        data_filename = os.path.join(tempdir, 'a.bin')
        if os.path.exists(data_filename):
          os.remove(data_filename)

        process = subprocess.Popen([
            'dd', 'if=/dev/urandom',
            'of=%s' % data_filename, 'bs=1M',
            'count=%d' % mb
        ])
        process.wait()

        with io.open(data_filename, u'rb') as fd:
          info = tar.gettarinfo(name=data_filename)
          tar.addfile(info, fileobj=fd)
          os.remove(data_filename)
          os.rmdir(tempdir)
      else:
        data = sample(string.printable.encode('utf8'), layer_byte_size)
        info = tarfile.TarInfo(name='./' + self._next_id(sample))
        info.size = len(data)
        tar.addfile(info, fileobj=io.BytesIO(data))

    return buf.getvalue()

  # __enter__ and __exit__ allow use as a context manager.
  def __enter__(self):
    return self

  def __exit__(self, unused_type, unused_value, unused_traceback):
    pass