File: //proc/thread-self/root/snap/google-cloud-cli/current/lib/third_party/dulwich/index.py
# index.py -- File parser/writer for the git index file
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Parser for the git index file format."""
import collections
import os
import stat
import struct
import sys
from typing import (
Any,
BinaryIO,
Callable,
Dict,
List,
Optional,
TYPE_CHECKING,
Iterable,
Iterator,
Tuple,
)
if TYPE_CHECKING:
from dulwich.object_store import BaseObjectStore
from dulwich.file import GitFile
from dulwich.objects import (
Blob,
S_IFGITLINK,
S_ISGITLINK,
Tree,
hex_to_sha,
sha_to_hex,
)
from dulwich.pack import (
SHA1Reader,
SHA1Writer,
)
IndexEntry = collections.namedtuple(
"IndexEntry",
[
"ctime",
"mtime",
"dev",
"ino",
"mode",
"uid",
"gid",
"size",
"sha",
"flags",
],
)
FLAG_STAGEMASK = 0x3000
FLAG_VALID = 0x8000
FLAG_EXTENDED = 0x4000
DEFAULT_VERSION = 2
def pathsplit(path):
"""Split a /-delimited path into a directory part and a basename.
Args:
path: The path to split.
Returns:
Tuple with directory name and basename
"""
try:
(dirname, basename) = path.rsplit(b"/", 1)
except ValueError:
return (b"", path)
else:
return (dirname, basename)
def pathjoin(*args):
"""Join a /-delimited path."""
return b"/".join([p for p in args if p])
def read_cache_time(f):
"""Read a cache time.
Args:
f: File-like object to read from
Returns:
Tuple with seconds and nanoseconds
"""
return struct.unpack(">LL", f.read(8))
def write_cache_time(f, t):
"""Write a cache time.
Args:
f: File-like object to write to
t: Time to write (as int, float or tuple with secs and nsecs)
"""
if isinstance(t, int):
t = (t, 0)
elif isinstance(t, float):
(secs, nsecs) = divmod(t, 1.0)
t = (int(secs), int(nsecs * 1000000000))
elif not isinstance(t, tuple):
raise TypeError(t)
f.write(struct.pack(">LL", *t))
def read_cache_entry(f):
"""Read an entry from a cache file.
Args:
f: File-like object to read from
Returns:
tuple with: device, inode, mode, uid, gid, size, sha, flags
"""
beginoffset = f.tell()
ctime = read_cache_time(f)
mtime = read_cache_time(f)
(
dev,
ino,
mode,
uid,
gid,
size,
sha,
flags,
) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
name = f.read((flags & 0x0FFF))
# Padding:
real_size = (f.tell() - beginoffset + 8) & ~7
f.read((beginoffset + real_size) - f.tell())
return (
name,
ctime,
mtime,
dev,
ino,
mode,
uid,
gid,
size,
sha_to_hex(sha),
flags & ~0x0FFF,
)
def write_cache_entry(f, entry):
"""Write an index entry to a file.
Args:
f: File object
entry: Entry to write, tuple with:
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags)
"""
beginoffset = f.tell()
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry
write_cache_time(f, ctime)
write_cache_time(f, mtime)
flags = len(name) | (flags & ~0x0FFF)
f.write(
struct.pack(
b">LLLLLL20sH",
dev & 0xFFFFFFFF,
ino & 0xFFFFFFFF,
mode,
uid,
gid,
size,
hex_to_sha(sha),
flags,
)
)
f.write(name)
real_size = (f.tell() - beginoffset + 8) & ~7
f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
def read_index(f: BinaryIO):
"""Read an index file, yielding the individual entries."""
header = f.read(4)
if header != b"DIRC":
raise AssertionError("Invalid index file header: %r" % header)
(version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
assert version in (1, 2)
for i in range(num_entries):
yield read_cache_entry(f)
def read_index_dict(f):
"""Read an index file and return it as a dictionary.
Args:
f: File object to read from
"""
ret = {}
for x in read_index(f):
ret[x[0]] = IndexEntry(*x[1:])
return ret
def write_index(f: BinaryIO, entries: List[Any], version: Optional[int] = None):
"""Write an index file.
Args:
f: File-like object to write to
version: Version number to write
entries: Iterable over the entries to write
"""
if version is None:
version = DEFAULT_VERSION
f.write(b"DIRC")
f.write(struct.pack(b">LL", version, len(entries)))
for x in entries:
write_cache_entry(f, x)
def write_index_dict(
f: BinaryIO,
entries: Dict[bytes, IndexEntry],
version: Optional[int] = None,
) -> None:
"""Write an index file based on the contents of a dictionary."""
entries_list = []
for name in sorted(entries):
entries_list.append((name,) + tuple(entries[name]))
write_index(f, entries_list, version=version)
def cleanup_mode(mode: int) -> int:
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
Args:
mode: Mode to clean up.
Returns:
mode
"""
if stat.S_ISLNK(mode):
return stat.S_IFLNK
elif stat.S_ISDIR(mode):
return stat.S_IFDIR
elif S_ISGITLINK(mode):
return S_IFGITLINK
ret = stat.S_IFREG | 0o644
if mode & 0o100:
ret |= 0o111
return ret
class Index(object):
"""A Git Index file."""
def __init__(self, filename):
"""Open an index file.
Args:
filename: Path to the index file
"""
self._filename = filename
# TODO(user): Store the version returned by read_index
self._version = None
self.clear()
self.read()
@property
def path(self):
return self._filename
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self._filename)
def write(self) -> None:
"""Write current contents of index to disk."""
f = GitFile(self._filename, "wb")
try:
f = SHA1Writer(f)
write_index_dict(f, self._byname, version=self._version)
finally:
f.close()
def read(self):
"""Read current contents of index from disk."""
if not os.path.exists(self._filename):
return
f = GitFile(self._filename, "rb")
try:
f = SHA1Reader(f)
for x in read_index(f):
self[x[0]] = IndexEntry(*x[1:])
# FIXME: Additional data?
f.read(os.path.getsize(self._filename) - f.tell() - 20)
f.check_sha()
finally:
f.close()
def __len__(self) -> int:
"""Number of entries in this index file."""
return len(self._byname)
def __getitem__(self, name: bytes) -> IndexEntry:
"""Retrieve entry by relative path.
Returns: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha,
flags)
"""
return self._byname[name]
def __iter__(self) -> Iterator[bytes]:
"""Iterate over the paths in this index."""
return iter(self._byname)
def get_sha1(self, path: bytes) -> bytes:
"""Return the (git object) SHA1 for the object at a path."""
return self[path].sha
def get_mode(self, path: bytes) -> int:
"""Return the POSIX file mode for the object at a path."""
return self[path].mode
def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
"""Iterate over path, sha, mode tuples for use with commit_tree."""
for path in self:
entry = self[path]
yield path, entry.sha, cleanup_mode(entry.mode)
def iterblobs(self):
import warnings
warnings.warn("Use iterobjects() instead.", PendingDeprecationWarning)
return self.iterobjects()
def clear(self):
"""Remove all contents from this index."""
self._byname = {}
def __setitem__(self, name, x):
assert isinstance(name, bytes)
assert len(x) == 10
# Remove the old entry if any
self._byname[name] = IndexEntry(*x)
def __delitem__(self, name):
assert isinstance(name, bytes)
del self._byname[name]
def iteritems(self):
return self._byname.items()
def items(self):
return self._byname.items()
def update(self, entries):
for name, value in entries.items():
self[name] = value
def changes_from_tree(self, object_store, tree, want_unchanged=False):
"""Find the differences between the contents of this index and a tree.
Args:
object_store: Object store to use for retrieving tree contents
tree: SHA1 of the root tree
want_unchanged: Whether unchanged files should be reported
Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
newmode), (oldsha, newsha)
"""
def lookup_entry(path):
entry = self[path]
return entry.sha, cleanup_mode(entry.mode)
for (name, mode, sha) in changes_from_tree(
self._byname.keys(),
lookup_entry,
object_store,
tree,
want_unchanged=want_unchanged,
):
yield (name, mode, sha)
def commit(self, object_store):
"""Create a new tree from an index.
Args:
object_store: Object store to save the tree in
Returns:
Root tree SHA
"""
return commit_tree(object_store, self.iterobjects())
def commit_tree(
object_store: "BaseObjectStore", blobs: Iterable[Tuple[bytes, bytes, int]]
) -> bytes:
"""Commit a new tree.
Args:
object_store: Object store to add trees to
blobs: Iterable over blob path, sha, mode entries
Returns:
SHA1 of the created tree.
"""
trees = {b"": {}} # type: Dict[bytes, Any]
def add_tree(path):
if path in trees:
return trees[path]
dirname, basename = pathsplit(path)
t = add_tree(dirname)
assert isinstance(basename, bytes)
newtree = {}
t[basename] = newtree
trees[path] = newtree
return newtree
for path, sha, mode in blobs:
tree_path, basename = pathsplit(path)
tree = add_tree(tree_path)
tree[basename] = (mode, sha)
def build_tree(path):
tree = Tree()
for basename, entry in trees[path].items():
if isinstance(entry, dict):
mode = stat.S_IFDIR
sha = build_tree(pathjoin(path, basename))
else:
(mode, sha) = entry
tree.add(basename, mode, sha)
object_store.add_object(tree)
return tree.id
return build_tree(b"")
def commit_index(object_store: "BaseObjectStore", index: Index) -> bytes:
"""Create a new tree from an index.
Args:
object_store: Object store to save the tree in
index: Index file
Note: This function is deprecated, use index.commit() instead.
Returns: Root tree sha.
"""
return commit_tree(object_store, index.iterobjects())
def changes_from_tree(
names: Iterable[bytes],
lookup_entry: Callable[[bytes], Tuple[bytes, int]],
object_store: "BaseObjectStore",
tree: Optional[bytes],
want_unchanged=False,
) -> Iterable[
Tuple[
Tuple[Optional[bytes], Optional[bytes]],
Tuple[Optional[int], Optional[int]],
Tuple[Optional[bytes], Optional[bytes]],
]
]:
"""Find the differences between the contents of a tree and
a working copy.
Args:
names: Iterable of names in the working copy
lookup_entry: Function to lookup an entry in the working copy
object_store: Object store to use for retrieving tree contents
tree: SHA1 of the root tree, or None for an empty tree
want_unchanged: Whether unchanged files should be reported
Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
(oldsha, newsha)
"""
# TODO(user): Support a include_trees option
other_names = set(names)
if tree is not None:
for (name, mode, sha) in object_store.iter_tree_contents(tree):
try:
(other_sha, other_mode) = lookup_entry(name)
except KeyError:
# Was removed
yield ((name, None), (mode, None), (sha, None))
else:
other_names.remove(name)
if want_unchanged or other_sha != sha or other_mode != mode:
yield ((name, name), (mode, other_mode), (sha, other_sha))
# Mention added files
for name in other_names:
try:
(other_sha, other_mode) = lookup_entry(name)
except KeyError:
pass
else:
yield ((None, name), (None, other_mode), (None, other_sha))
def index_entry_from_stat(
stat_val, hex_sha: bytes, flags: int, mode: Optional[int] = None
):
"""Create a new index entry from a stat value.
Args:
stat_val: POSIX stat_result instance
hex_sha: Hex sha of the object
flags: Index flags
"""
if mode is None:
mode = cleanup_mode(stat_val.st_mode)
return IndexEntry(
stat_val.st_ctime,
stat_val.st_mtime,
stat_val.st_dev,
stat_val.st_ino,
mode,
stat_val.st_uid,
stat_val.st_gid,
stat_val.st_size,
hex_sha,
flags,
)
def build_file_from_blob(
blob, mode, target_path, honor_filemode=True, tree_encoding="utf-8"
):
"""Build a file or symlink on disk based on a Git object.
Args:
obj: The git object
mode: File mode
target_path: Path to write to
honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
Returns: stat object for the file
"""
try:
oldstat = os.lstat(target_path)
except FileNotFoundError:
oldstat = None
contents = blob.as_raw_string()
if stat.S_ISLNK(mode):
# FIXME: This will fail on Windows. What should we do instead?
if oldstat:
os.unlink(target_path)
if sys.platform == "win32":
# os.readlink on Python3 on Windows requires a unicode string.
contents = contents.decode(tree_encoding)
target_path = target_path.decode(tree_encoding)
os.symlink(contents, target_path)
else:
if oldstat is not None and oldstat.st_size == len(contents):
with open(target_path, "rb") as f:
if f.read() == contents:
return oldstat
with open(target_path, "wb") as f:
# Write out file
f.write(contents)
if honor_filemode:
os.chmod(target_path, mode)
return os.lstat(target_path)
INVALID_DOTNAMES = (b".git", b".", b"..", b"")
def validate_path_element_default(element):
return element.lower() not in INVALID_DOTNAMES
def validate_path_element_ntfs(element):
stripped = element.rstrip(b". ").lower()
if stripped in INVALID_DOTNAMES:
return False
if stripped == b"git~1":
return False
return True
def validate_path(path, element_validator=validate_path_element_default):
"""Default path validator that just checks for .git/."""
parts = path.split(b"/")
for p in parts:
if not element_validator(p):
return False
else:
return True
def build_index_from_tree(
root_path,
index_path,
object_store,
tree_id,
honor_filemode=True,
validate_path_element=validate_path_element_default,
):
"""Generate and materialize index from a tree
Args:
tree_id: Tree to materialize
root_path: Target dir for materialized index files
index_path: Target path for generated index
object_store: Non-empty object store holding tree contents
honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
validate_path_element: Function to validate path elements to check
out; default just refuses .git and .. directories.
Note: existing index is wiped and contents are not merged
in a working dir. Suitable only for fresh clones.
"""
index = Index(index_path)
if not isinstance(root_path, bytes):
root_path = os.fsencode(root_path)
for entry in object_store.iter_tree_contents(tree_id):
if not validate_path(entry.path, validate_path_element):
continue
full_path = _tree_to_fs_path(root_path, entry.path)
if not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
# TODO(user): Merge new index into working tree
if S_ISGITLINK(entry.mode):
if not os.path.isdir(full_path):
os.mkdir(full_path)
st = os.lstat(full_path)
# TODO(user): record and return submodule paths
else:
obj = object_store[entry.sha]
st = build_file_from_blob(
obj, entry.mode, full_path, honor_filemode=honor_filemode
)
# Add file to index
if not honor_filemode or S_ISGITLINK(entry.mode):
# we can not use tuple slicing to build a new tuple,
# because on windows that will convert the times to
# longs, which causes errors further along
st_tuple = (
entry.mode,
st.st_ino,
st.st_dev,
st.st_nlink,
st.st_uid,
st.st_gid,
st.st_size,
st.st_atime,
st.st_mtime,
st.st_ctime,
)
st = st.__class__(st_tuple)
index[entry.path] = index_entry_from_stat(st, entry.sha, 0)
index.write()
def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
"""Create a blob from a path and a stat object.
Args:
fs_path: Full file system path to file
st: A stat object
Returns: A `Blob` object
"""
assert isinstance(fs_path, bytes)
blob = Blob()
if stat.S_ISLNK(mode):
if sys.platform == "win32":
# os.readlink on Python3 on Windows requires a unicode string.
fs_path = os.fsdecode(fs_path)
blob.data = os.readlink(fs_path).encode(tree_encoding)
else:
blob.data = os.readlink(fs_path)
else:
with open(fs_path, "rb") as f:
blob.data = f.read()
return blob
def blob_from_path_and_stat(fs_path, st, tree_encoding="utf-8"):
"""Create a blob from a path and a stat object.
Args:
fs_path: Full file system path to file
st: A stat object
Returns: A `Blob` object
"""
return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
def read_submodule_head(path):
"""Read the head commit of a submodule.
Args:
path: path to the submodule
Returns: HEAD sha, None if not a valid head/repository
"""
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
# Repo currently expects a "str", so decode if necessary.
# TODO(user): Perhaps move this into Repo() ?
if not isinstance(path, str):
path = os.fsdecode(path)
try:
repo = Repo(path)
except NotGitRepository:
return None
try:
return repo.head()
except KeyError:
return None
def _has_directory_changed(tree_path, entry):
"""Check if a directory has changed after getting an error.
When handling an error trying to create a blob from a path, call this
function. It will check if the path is a directory. If it's a directory
and a submodule, check the submodule head to see if it's has changed. If
not, consider the file as changed as Git tracked a file and not a
directory.
Return true if the given path should be considered as changed and False
otherwise or if the path is not a directory.
"""
# This is actually a directory
if os.path.exists(os.path.join(tree_path, b".git")):
# Submodule
head = read_submodule_head(tree_path)
if entry.sha != head:
return True
else:
# The file was changed to a directory, so consider it removed.
return True
return False
def get_unstaged_changes(index: Index, root_path, filter_blob_callback=None):
"""Walk through an index and check for differences against working tree.
Args:
index: index to check
root_path: path in which to find files
Returns: iterator over paths with unstaged changes
"""
# For each entry in the index check the sha1 & ensure not staged
if not isinstance(root_path, bytes):
root_path = os.fsencode(root_path)
for tree_path, entry in index.iteritems():
full_path = _tree_to_fs_path(root_path, tree_path)
try:
st = os.lstat(full_path)
if stat.S_ISDIR(st.st_mode):
if _has_directory_changed(tree_path, entry):
yield tree_path
continue
if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
continue
blob = blob_from_path_and_stat(full_path, st)
if filter_blob_callback is not None:
blob = filter_blob_callback(blob, tree_path)
except FileNotFoundError:
# The file was removed, so we assume that counts as
# different from whatever file used to exist.
yield tree_path
else:
if blob.id != entry.sha:
yield tree_path
os_sep_bytes = os.sep.encode("ascii")
def _tree_to_fs_path(root_path, tree_path: bytes):
"""Convert a git tree path to a file system path.
Args:
root_path: Root filesystem path
tree_path: Git tree path as bytes
Returns: File system path.
"""
assert isinstance(tree_path, bytes)
if os_sep_bytes != b"/":
sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
else:
sep_corrected_path = tree_path
return os.path.join(root_path, sep_corrected_path)
def _fs_to_tree_path(fs_path):
"""Convert a file system path to a git tree path.
Args:
fs_path: File system path.
Returns: Git tree path as bytes
"""
if not isinstance(fs_path, bytes):
fs_path_bytes = os.fsencode(fs_path)
else:
fs_path_bytes = fs_path
if os_sep_bytes != b"/":
tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
else:
tree_path = fs_path_bytes
return tree_path
def index_entry_from_path(path, object_store=None):
"""Create an index from a filesystem path.
This returns an index value for files, symlinks
and tree references. for directories and
non-existant files it returns None
Args:
path: Path to create an index entry for
object_store: Optional object store to
save new blobs in
Returns: An index entry; None for directories
"""
assert isinstance(path, bytes)
st = os.lstat(path)
if stat.S_ISDIR(st.st_mode):
if os.path.exists(os.path.join(path, b".git")):
head = read_submodule_head(path)
if head is None:
return None
return index_entry_from_stat(st, head, 0, mode=S_IFGITLINK)
return None
if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
blob = blob_from_path_and_stat(path, st)
if object_store is not None:
object_store.add_object(blob)
return index_entry_from_stat(st, blob.id, 0)
return None
def iter_fresh_entries(
paths, root_path, object_store: Optional["BaseObjectStore"] = None
):
"""Iterate over current versions of index entries on disk.
Args:
paths: Paths to iterate over
root_path: Root path to access from
store: Optional store to save new blobs in
Returns: Iterator over path, index_entry
"""
for path in paths:
p = _tree_to_fs_path(root_path, path)
try:
entry = index_entry_from_path(p, object_store=object_store)
except (FileNotFoundError, IsADirectoryError):
entry = None
yield path, entry
def iter_fresh_blobs(index, root_path):
"""Iterate over versions of blobs on disk referenced by index.
Don't use this function; it removes missing entries from index.
Args:
index: Index file
root_path: Root path to access from
include_deleted: Include deleted entries with sha and
mode set to None
Returns: Iterator over path, sha, mode
"""
import warnings
warnings.warn(PendingDeprecationWarning, "Use iter_fresh_objects instead.")
for entry in iter_fresh_objects(index, root_path, include_deleted=True):
if entry[1] is None:
del index[entry[0]]
else:
yield entry
def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=None):
"""Iterate over versions of objecs on disk referenced by index.
Args:
root_path: Root path to access from
include_deleted: Include deleted entries with sha and
mode set to None
object_store: Optional object store to report new items to
Returns: Iterator over path, sha, mode
"""
for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
if entry is None:
if include_deleted:
yield path, None, None
else:
entry = IndexEntry(*entry)
yield path, entry.sha, cleanup_mode(entry.mode)
def refresh_index(index, root_path):
"""Refresh the contents of an index.
This is the equivalent to running 'git commit -a'.
Args:
index: Index to update
root_path: Root filesystem path
"""
for path, entry in iter_fresh_entries(index, root_path):
index[path] = path