HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/third_party/dulwich/index.py
# index.py -- File parser/writer for the git index file
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Parser for the git index file format."""

import collections
import os
import stat
import struct
import sys
from typing import (
    Any,
    BinaryIO,
    Callable,
    Dict,
    List,
    Optional,
    TYPE_CHECKING,
    Iterable,
    Iterator,
    Tuple,
)

if TYPE_CHECKING:
    from dulwich.object_store import BaseObjectStore

from dulwich.file import GitFile
from dulwich.objects import (
    Blob,
    S_IFGITLINK,
    S_ISGITLINK,
    Tree,
    hex_to_sha,
    sha_to_hex,
)
from dulwich.pack import (
    SHA1Reader,
    SHA1Writer,
)


IndexEntry = collections.namedtuple(
    "IndexEntry",
    [
        "ctime",
        "mtime",
        "dev",
        "ino",
        "mode",
        "uid",
        "gid",
        "size",
        "sha",
        "flags",
    ],
)


FLAG_STAGEMASK = 0x3000
FLAG_VALID = 0x8000
FLAG_EXTENDED = 0x4000


DEFAULT_VERSION = 2


def pathsplit(path):
    """Split a /-delimited path into a directory part and a basename.

    Args:
      path: The path to split.
    Returns:
      Tuple with directory name and basename
    """
    try:
        (dirname, basename) = path.rsplit(b"/", 1)
    except ValueError:
        return (b"", path)
    else:
        return (dirname, basename)


def pathjoin(*args):
    """Join a /-delimited path."""
    return b"/".join([p for p in args if p])


def read_cache_time(f):
    """Read a cache time.

    Args:
      f: File-like object to read from
    Returns:
      Tuple with seconds and nanoseconds
    """
    return struct.unpack(">LL", f.read(8))


def write_cache_time(f, t):
    """Write a cache time.

    Args:
      f: File-like object to write to
      t: Time to write (as int, float or tuple with secs and nsecs)
    """
    if isinstance(t, int):
        t = (t, 0)
    elif isinstance(t, float):
        (secs, nsecs) = divmod(t, 1.0)
        t = (int(secs), int(nsecs * 1000000000))
    elif not isinstance(t, tuple):
        raise TypeError(t)
    f.write(struct.pack(">LL", *t))


def read_cache_entry(f):
    """Read an entry from a cache file.

    Args:
      f: File-like object to read from
    Returns:
      tuple with: device, inode, mode, uid, gid, size, sha, flags
    """
    beginoffset = f.tell()
    ctime = read_cache_time(f)
    mtime = read_cache_time(f)
    (
        dev,
        ino,
        mode,
        uid,
        gid,
        size,
        sha,
        flags,
    ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
    name = f.read((flags & 0x0FFF))
    # Padding:
    real_size = (f.tell() - beginoffset + 8) & ~7
    f.read((beginoffset + real_size) - f.tell())
    return (
        name,
        ctime,
        mtime,
        dev,
        ino,
        mode,
        uid,
        gid,
        size,
        sha_to_hex(sha),
        flags & ~0x0FFF,
    )


def write_cache_entry(f, entry):
    """Write an index entry to a file.

    Args:
      f: File object
      entry: Entry to write, tuple with:
        (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags)
    """
    beginoffset = f.tell()
    (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry
    write_cache_time(f, ctime)
    write_cache_time(f, mtime)
    flags = len(name) | (flags & ~0x0FFF)
    f.write(
        struct.pack(
            b">LLLLLL20sH",
            dev & 0xFFFFFFFF,
            ino & 0xFFFFFFFF,
            mode,
            uid,
            gid,
            size,
            hex_to_sha(sha),
            flags,
        )
    )
    f.write(name)
    real_size = (f.tell() - beginoffset + 8) & ~7
    f.write(b"\0" * ((beginoffset + real_size) - f.tell()))


def read_index(f: BinaryIO):
    """Read an index file, yielding the individual entries."""
    header = f.read(4)
    if header != b"DIRC":
        raise AssertionError("Invalid index file header: %r" % header)
    (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
    assert version in (1, 2)
    for i in range(num_entries):
        yield read_cache_entry(f)


def read_index_dict(f):
    """Read an index file and return it as a dictionary.

    Args:
      f: File object to read from
    """
    ret = {}
    for x in read_index(f):
        ret[x[0]] = IndexEntry(*x[1:])
    return ret


def write_index(f: BinaryIO, entries: List[Any], version: Optional[int] = None):
    """Write an index file.

    Args:
      f: File-like object to write to
      version: Version number to write
      entries: Iterable over the entries to write
    """
    if version is None:
        version = DEFAULT_VERSION
    f.write(b"DIRC")
    f.write(struct.pack(b">LL", version, len(entries)))
    for x in entries:
        write_cache_entry(f, x)


def write_index_dict(
    f: BinaryIO,
    entries: Dict[bytes, IndexEntry],
    version: Optional[int] = None,
) -> None:
    """Write an index file based on the contents of a dictionary."""
    entries_list = []
    for name in sorted(entries):
        entries_list.append((name,) + tuple(entries[name]))
    write_index(f, entries_list, version=version)


def cleanup_mode(mode: int) -> int:
    """Cleanup a mode value.

    This will return a mode that can be stored in a tree object.

    Args:
      mode: Mode to clean up.
    Returns:
      mode
    """
    if stat.S_ISLNK(mode):
        return stat.S_IFLNK
    elif stat.S_ISDIR(mode):
        return stat.S_IFDIR
    elif S_ISGITLINK(mode):
        return S_IFGITLINK
    ret = stat.S_IFREG | 0o644
    if mode & 0o100:
        ret |= 0o111
    return ret


class Index(object):
    """A Git Index file."""

    def __init__(self, filename):
        """Open an index file.

        Args:
          filename: Path to the index file
        """
        self._filename = filename
        # TODO(user): Store the version returned by read_index
        self._version = None
        self.clear()
        self.read()

    @property
    def path(self):
        return self._filename

    def __repr__(self):
        return "%s(%r)" % (self.__class__.__name__, self._filename)

    def write(self) -> None:
        """Write current contents of index to disk."""
        f = GitFile(self._filename, "wb")
        try:
            f = SHA1Writer(f)
            write_index_dict(f, self._byname, version=self._version)
        finally:
            f.close()

    def read(self):
        """Read current contents of index from disk."""
        if not os.path.exists(self._filename):
            return
        f = GitFile(self._filename, "rb")
        try:
            f = SHA1Reader(f)
            for x in read_index(f):
                self[x[0]] = IndexEntry(*x[1:])
            # FIXME: Additional data?
            f.read(os.path.getsize(self._filename) - f.tell() - 20)
            f.check_sha()
        finally:
            f.close()

    def __len__(self) -> int:
        """Number of entries in this index file."""
        return len(self._byname)

    def __getitem__(self, name: bytes) -> IndexEntry:
        """Retrieve entry by relative path.

        Returns: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha,
            flags)
        """
        return self._byname[name]

    def __iter__(self) -> Iterator[bytes]:
        """Iterate over the paths in this index."""
        return iter(self._byname)

    def get_sha1(self, path: bytes) -> bytes:
        """Return the (git object) SHA1 for the object at a path."""
        return self[path].sha

    def get_mode(self, path: bytes) -> int:
        """Return the POSIX file mode for the object at a path."""
        return self[path].mode

    def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
        """Iterate over path, sha, mode tuples for use with commit_tree."""
        for path in self:
            entry = self[path]
            yield path, entry.sha, cleanup_mode(entry.mode)

    def iterblobs(self):
        import warnings

        warnings.warn("Use iterobjects() instead.", PendingDeprecationWarning)
        return self.iterobjects()

    def clear(self):
        """Remove all contents from this index."""
        self._byname = {}

    def __setitem__(self, name, x):
        assert isinstance(name, bytes)
        assert len(x) == 10
        # Remove the old entry if any
        self._byname[name] = IndexEntry(*x)

    def __delitem__(self, name):
        assert isinstance(name, bytes)
        del self._byname[name]

    def iteritems(self):
        return self._byname.items()

    def items(self):
        return self._byname.items()

    def update(self, entries):
        for name, value in entries.items():
            self[name] = value

    def changes_from_tree(self, object_store, tree, want_unchanged=False):
        """Find the differences between the contents of this index and a tree.

        Args:
          object_store: Object store to use for retrieving tree contents
          tree: SHA1 of the root tree
          want_unchanged: Whether unchanged files should be reported
        Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
            newmode), (oldsha, newsha)
        """

        def lookup_entry(path):
            entry = self[path]
            return entry.sha, cleanup_mode(entry.mode)

        for (name, mode, sha) in changes_from_tree(
            self._byname.keys(),
            lookup_entry,
            object_store,
            tree,
            want_unchanged=want_unchanged,
        ):
            yield (name, mode, sha)

    def commit(self, object_store):
        """Create a new tree from an index.

        Args:
          object_store: Object store to save the tree in
        Returns:
          Root tree SHA
        """
        return commit_tree(object_store, self.iterobjects())


def commit_tree(
    object_store: "BaseObjectStore", blobs: Iterable[Tuple[bytes, bytes, int]]
) -> bytes:
    """Commit a new tree.

    Args:
      object_store: Object store to add trees to
      blobs: Iterable over blob path, sha, mode entries
    Returns:
      SHA1 of the created tree.
    """
    trees = {b"": {}}  # type: Dict[bytes, Any]

    def add_tree(path):
        if path in trees:
            return trees[path]
        dirname, basename = pathsplit(path)
        t = add_tree(dirname)
        assert isinstance(basename, bytes)
        newtree = {}
        t[basename] = newtree
        trees[path] = newtree
        return newtree

    for path, sha, mode in blobs:
        tree_path, basename = pathsplit(path)
        tree = add_tree(tree_path)
        tree[basename] = (mode, sha)

    def build_tree(path):
        tree = Tree()
        for basename, entry in trees[path].items():
            if isinstance(entry, dict):
                mode = stat.S_IFDIR
                sha = build_tree(pathjoin(path, basename))
            else:
                (mode, sha) = entry
            tree.add(basename, mode, sha)
        object_store.add_object(tree)
        return tree.id

    return build_tree(b"")


def commit_index(object_store: "BaseObjectStore", index: Index) -> bytes:
    """Create a new tree from an index.

    Args:
      object_store: Object store to save the tree in
      index: Index file
    Note: This function is deprecated, use index.commit() instead.
    Returns: Root tree sha.
    """
    return commit_tree(object_store, index.iterobjects())


def changes_from_tree(
    names: Iterable[bytes],
    lookup_entry: Callable[[bytes], Tuple[bytes, int]],
    object_store: "BaseObjectStore",
    tree: Optional[bytes],
    want_unchanged=False,
) -> Iterable[
    Tuple[
        Tuple[Optional[bytes], Optional[bytes]],
        Tuple[Optional[int], Optional[int]],
        Tuple[Optional[bytes], Optional[bytes]],
    ]
]:
    """Find the differences between the contents of a tree and
    a working copy.

    Args:
      names: Iterable of names in the working copy
      lookup_entry: Function to lookup an entry in the working copy
      object_store: Object store to use for retrieving tree contents
      tree: SHA1 of the root tree, or None for an empty tree
      want_unchanged: Whether unchanged files should be reported
    Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
        (oldsha, newsha)
    """
    # TODO(user): Support a include_trees option
    other_names = set(names)

    if tree is not None:
        for (name, mode, sha) in object_store.iter_tree_contents(tree):
            try:
                (other_sha, other_mode) = lookup_entry(name)
            except KeyError:
                # Was removed
                yield ((name, None), (mode, None), (sha, None))
            else:
                other_names.remove(name)
                if want_unchanged or other_sha != sha or other_mode != mode:
                    yield ((name, name), (mode, other_mode), (sha, other_sha))

    # Mention added files
    for name in other_names:
        try:
            (other_sha, other_mode) = lookup_entry(name)
        except KeyError:
            pass
        else:
            yield ((None, name), (None, other_mode), (None, other_sha))


def index_entry_from_stat(
    stat_val, hex_sha: bytes, flags: int, mode: Optional[int] = None
):
    """Create a new index entry from a stat value.

    Args:
      stat_val: POSIX stat_result instance
      hex_sha: Hex sha of the object
      flags: Index flags
    """
    if mode is None:
        mode = cleanup_mode(stat_val.st_mode)

    return IndexEntry(
        stat_val.st_ctime,
        stat_val.st_mtime,
        stat_val.st_dev,
        stat_val.st_ino,
        mode,
        stat_val.st_uid,
        stat_val.st_gid,
        stat_val.st_size,
        hex_sha,
        flags,
    )


def build_file_from_blob(
    blob, mode, target_path, honor_filemode=True, tree_encoding="utf-8"
):
    """Build a file or symlink on disk based on a Git object.

    Args:
      obj: The git object
      mode: File mode
      target_path: Path to write to
      honor_filemode: An optional flag to honor core.filemode setting in
        config file, default is core.filemode=True, change executable bit
    Returns: stat object for the file
    """
    try:
        oldstat = os.lstat(target_path)
    except FileNotFoundError:
        oldstat = None
    contents = blob.as_raw_string()
    if stat.S_ISLNK(mode):
        # FIXME: This will fail on Windows. What should we do instead?
        if oldstat:
            os.unlink(target_path)
        if sys.platform == "win32":
            # os.readlink on Python3 on Windows requires a unicode string.
            contents = contents.decode(tree_encoding)
            target_path = target_path.decode(tree_encoding)
        os.symlink(contents, target_path)
    else:
        if oldstat is not None and oldstat.st_size == len(contents):
            with open(target_path, "rb") as f:
                if f.read() == contents:
                    return oldstat

        with open(target_path, "wb") as f:
            # Write out file
            f.write(contents)

        if honor_filemode:
            os.chmod(target_path, mode)

    return os.lstat(target_path)


INVALID_DOTNAMES = (b".git", b".", b"..", b"")


def validate_path_element_default(element):
    return element.lower() not in INVALID_DOTNAMES


def validate_path_element_ntfs(element):
    stripped = element.rstrip(b". ").lower()
    if stripped in INVALID_DOTNAMES:
        return False
    if stripped == b"git~1":
        return False
    return True


def validate_path(path, element_validator=validate_path_element_default):
    """Default path validator that just checks for .git/."""
    parts = path.split(b"/")
    for p in parts:
        if not element_validator(p):
            return False
    else:
        return True


def build_index_from_tree(
    root_path,
    index_path,
    object_store,
    tree_id,
    honor_filemode=True,
    validate_path_element=validate_path_element_default,
):
    """Generate and materialize index from a tree

    Args:
      tree_id: Tree to materialize
      root_path: Target dir for materialized index files
      index_path: Target path for generated index
      object_store: Non-empty object store holding tree contents
      honor_filemode: An optional flag to honor core.filemode setting in
        config file, default is core.filemode=True, change executable bit
      validate_path_element: Function to validate path elements to check
        out; default just refuses .git and .. directories.

    Note: existing index is wiped and contents are not merged
        in a working dir. Suitable only for fresh clones.
    """

    index = Index(index_path)
    if not isinstance(root_path, bytes):
        root_path = os.fsencode(root_path)

    for entry in object_store.iter_tree_contents(tree_id):
        if not validate_path(entry.path, validate_path_element):
            continue
        full_path = _tree_to_fs_path(root_path, entry.path)

        if not os.path.exists(os.path.dirname(full_path)):
            os.makedirs(os.path.dirname(full_path))

        # TODO(user): Merge new index into working tree
        if S_ISGITLINK(entry.mode):
            if not os.path.isdir(full_path):
                os.mkdir(full_path)
            st = os.lstat(full_path)
            # TODO(user): record and return submodule paths
        else:
            obj = object_store[entry.sha]
            st = build_file_from_blob(
                obj, entry.mode, full_path, honor_filemode=honor_filemode
            )

        # Add file to index
        if not honor_filemode or S_ISGITLINK(entry.mode):
            # we can not use tuple slicing to build a new tuple,
            # because on windows that will convert the times to
            # longs, which causes errors further along
            st_tuple = (
                entry.mode,
                st.st_ino,
                st.st_dev,
                st.st_nlink,
                st.st_uid,
                st.st_gid,
                st.st_size,
                st.st_atime,
                st.st_mtime,
                st.st_ctime,
            )
            st = st.__class__(st_tuple)
        index[entry.path] = index_entry_from_stat(st, entry.sha, 0)

    index.write()


def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
    """Create a blob from a path and a stat object.

    Args:
      fs_path: Full file system path to file
      st: A stat object
    Returns: A `Blob` object
    """
    assert isinstance(fs_path, bytes)
    blob = Blob()
    if stat.S_ISLNK(mode):
        if sys.platform == "win32":
            # os.readlink on Python3 on Windows requires a unicode string.
            fs_path = os.fsdecode(fs_path)
            blob.data = os.readlink(fs_path).encode(tree_encoding)
        else:
            blob.data = os.readlink(fs_path)
    else:
        with open(fs_path, "rb") as f:
            blob.data = f.read()
    return blob


def blob_from_path_and_stat(fs_path, st, tree_encoding="utf-8"):
    """Create a blob from a path and a stat object.

    Args:
      fs_path: Full file system path to file
      st: A stat object
    Returns: A `Blob` object
    """
    return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)


def read_submodule_head(path):
    """Read the head commit of a submodule.

    Args:
      path: path to the submodule
    Returns: HEAD sha, None if not a valid head/repository
    """
    from dulwich.errors import NotGitRepository
    from dulwich.repo import Repo

    # Repo currently expects a "str", so decode if necessary.
    # TODO(user): Perhaps move this into Repo() ?
    if not isinstance(path, str):
        path = os.fsdecode(path)
    try:
        repo = Repo(path)
    except NotGitRepository:
        return None
    try:
        return repo.head()
    except KeyError:
        return None


def _has_directory_changed(tree_path, entry):
    """Check if a directory has changed after getting an error.

    When handling an error trying to create a blob from a path, call this
    function. It will check if the path is a directory. If it's a directory
    and a submodule, check the submodule head to see if it's has changed. If
    not, consider the file as changed as Git tracked a file and not a
    directory.

    Return true if the given path should be considered as changed and False
    otherwise or if the path is not a directory.
    """
    # This is actually a directory
    if os.path.exists(os.path.join(tree_path, b".git")):
        # Submodule
        head = read_submodule_head(tree_path)
        if entry.sha != head:
            return True
    else:
        # The file was changed to a directory, so consider it removed.
        return True

    return False


def get_unstaged_changes(index: Index, root_path, filter_blob_callback=None):
    """Walk through an index and check for differences against working tree.

    Args:
      index: index to check
      root_path: path in which to find files
    Returns: iterator over paths with unstaged changes
    """
    # For each entry in the index check the sha1 & ensure not staged
    if not isinstance(root_path, bytes):
        root_path = os.fsencode(root_path)

    for tree_path, entry in index.iteritems():
        full_path = _tree_to_fs_path(root_path, tree_path)
        try:
            st = os.lstat(full_path)
            if stat.S_ISDIR(st.st_mode):
                if _has_directory_changed(tree_path, entry):
                    yield tree_path
                continue

            if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
                continue

            blob = blob_from_path_and_stat(full_path, st)

            if filter_blob_callback is not None:
                blob = filter_blob_callback(blob, tree_path)
        except FileNotFoundError:
            # The file was removed, so we assume that counts as
            # different from whatever file used to exist.
            yield tree_path
        else:
            if blob.id != entry.sha:
                yield tree_path


os_sep_bytes = os.sep.encode("ascii")


def _tree_to_fs_path(root_path, tree_path: bytes):
    """Convert a git tree path to a file system path.

    Args:
      root_path: Root filesystem path
      tree_path: Git tree path as bytes

    Returns: File system path.
    """
    assert isinstance(tree_path, bytes)
    if os_sep_bytes != b"/":
        sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
    else:
        sep_corrected_path = tree_path
    return os.path.join(root_path, sep_corrected_path)


def _fs_to_tree_path(fs_path):
    """Convert a file system path to a git tree path.

    Args:
      fs_path: File system path.

    Returns:  Git tree path as bytes
    """
    if not isinstance(fs_path, bytes):
        fs_path_bytes = os.fsencode(fs_path)
    else:
        fs_path_bytes = fs_path
    if os_sep_bytes != b"/":
        tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
    else:
        tree_path = fs_path_bytes
    return tree_path


def index_entry_from_path(path, object_store=None):
    """Create an index from a filesystem path.

    This returns an index value for files, symlinks
    and tree references. for directories and
    non-existant files it returns None

    Args:
      path: Path to create an index entry for
      object_store: Optional object store to
        save new blobs in
    Returns: An index entry; None for directories
    """
    assert isinstance(path, bytes)
    st = os.lstat(path)
    if stat.S_ISDIR(st.st_mode):
        if os.path.exists(os.path.join(path, b".git")):
            head = read_submodule_head(path)
            if head is None:
                return None
            return index_entry_from_stat(st, head, 0, mode=S_IFGITLINK)
        return None

    if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
        blob = blob_from_path_and_stat(path, st)
        if object_store is not None:
            object_store.add_object(blob)
        return index_entry_from_stat(st, blob.id, 0)

    return None


def iter_fresh_entries(
    paths, root_path, object_store: Optional["BaseObjectStore"] = None
):
    """Iterate over current versions of index entries on disk.

    Args:
      paths: Paths to iterate over
      root_path: Root path to access from
      store: Optional store to save new blobs in
    Returns: Iterator over path, index_entry
    """
    for path in paths:
        p = _tree_to_fs_path(root_path, path)
        try:
            entry = index_entry_from_path(p, object_store=object_store)
        except (FileNotFoundError, IsADirectoryError):
            entry = None
        yield path, entry


def iter_fresh_blobs(index, root_path):
    """Iterate over versions of blobs on disk referenced by index.

    Don't use this function; it removes missing entries from index.

    Args:
      index: Index file
      root_path: Root path to access from
      include_deleted: Include deleted entries with sha and
        mode set to None
    Returns: Iterator over path, sha, mode
    """
    import warnings

    warnings.warn(PendingDeprecationWarning, "Use iter_fresh_objects instead.")
    for entry in iter_fresh_objects(index, root_path, include_deleted=True):
        if entry[1] is None:
            del index[entry[0]]
        else:
            yield entry


def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=None):
    """Iterate over versions of objecs on disk referenced by index.

    Args:
      root_path: Root path to access from
      include_deleted: Include deleted entries with sha and
        mode set to None
      object_store: Optional object store to report new items to
    Returns: Iterator over path, sha, mode
    """
    for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
        if entry is None:
            if include_deleted:
                yield path, None, None
        else:
            entry = IndexEntry(*entry)
            yield path, entry.sha, cleanup_mode(entry.mode)


def refresh_index(index, root_path):
    """Refresh the contents of an index.

    This is the equivalent to running 'git commit -a'.

    Args:
      index: Index to update
      root_path: Root filesystem path
    """
    for path, entry in iter_fresh_entries(index, root_path):
        index[path] = path