File: //proc/thread-self/root/snap/google-cloud-cli/394/lib/surface/auth/git_helper.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A git credential helper that provides Google git repository passwords.
Reads a session from stdin that looks a lot like:
protocol=https
host=code.google.com
And writes out a session to stdout that looks a lot like:
username=me
password=secret
Errors will be reported on stderr.
Note that spaces may be part of key names so, for example, "protocol" must not
be proceeded by leading spaces.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import re
import subprocess
import sys
import textwrap
from googlecloudsdk.api_lib.auth import exceptions as auth_exceptions
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions as c_exc
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.credentials import creds as c_creds
from googlecloudsdk.core.credentials import exceptions as creds_exceptions
from googlecloudsdk.core.credentials import store as c_store
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms
from oauth2client import client
import six
_KEYVAL_RE = re.compile(r'(.+)=(.*)')
_BLANK_LINE_RE = re.compile(r'^ *$')
@base.Hidden
@base.DefaultUniverseOnly
class GitHelper(base.Command):
"""A git credential helper to provide access to Google git repositories."""
GET = 'get'
STORE = 'store'
METHODS = [GET, STORE]
GOOGLESOURCE = 'googlesource.com'
@staticmethod
def Args(parser):
parser.add_argument('method',
help='The git credential helper method.')
parser.add_argument('--ignore-unknown',
action='store_true',
help=('Produce no output and exit with 0 when given '
'an unknown method (e.g. store) or host.'))
@c_exc.RaiseErrorInsteadOf(auth_exceptions.AuthenticationError, client.Error)
def Run(self, args):
"""Run the helper command."""
# Disable self signed jwt for this command.
properties.VALUES.auth.service_account_use_self_signed_jwt.Set(False)
if args.method not in GitHelper.METHODS:
if args.ignore_unknown:
return
raise auth_exceptions.GitCredentialHelperError(
'Unexpected method [{meth}]. One of [{methods}] expected.'
.format(meth=args.method, methods=', '.join(GitHelper.METHODS)))
info = self._ParseInput()
credentialed_domains = [
'source.developers.google.com',
GitHelper.GOOGLESOURCE, # Requires a different username value.
]
credentialed_domains_suffix = [
'.sourcemanager.dev',
'.blueoryx.dev',
'.developerconnect.dev',
'.'+GitHelper.GOOGLESOURCE,
]
extra = properties.VALUES.core.credentialed_hosted_repo_domains.Get()
if extra:
credentialed_domains.extend(extra.split(','))
host = info.get('host')
def _ValidateHost(host):
if host in credentialed_domains:
return True
for suffix in credentialed_domains_suffix:
if host.endswith(suffix):
return True
return False
if not _ValidateHost(host):
if not args.ignore_unknown:
raise auth_exceptions.GitCredentialHelperError(
'Unknown host [{host}].'.format(host=host))
return
if args.method == GitHelper.GET:
account = properties.VALUES.core.account.Get()
try:
cred = c_store.Load(account, use_google_auth=True)
c_store.Refresh(cred)
except creds_exceptions.Error as e:
sys.stderr.write(textwrap.dedent("""\
ERROR: {error}
Run 'gcloud auth login' to log in.
""".format(error=six.text_type(e))))
return
self._CheckNetrc()
# For googlesource.com, any username beginning with "git-" is accepted
# and the identity of the user is extracted from the token server-side.
if (host == GitHelper.GOOGLESOURCE
or host.endswith('.'+GitHelper.GOOGLESOURCE)):
sent_account = 'git-account'
else:
sent_account = account
if c_creds.IsOauth2ClientCredentials(cred):
access_token = cred.access_token
else:
access_token = cred.token
sys.stdout.write(
textwrap.dedent("""\
username={username}
password={password}
""").format(username=sent_account, password=access_token))
elif args.method == GitHelper.STORE:
# On OSX, there is an additional credential helper that gets called before
# ours does. When we return a token, it gets cached there. Git continues
# to get it from there first until it expires. That command then fails,
# and the token is deleted, but it does not retry the operation. The next
# command gets a new token from us and it starts working again, for an
# hour. This erases our credential from the other cache whenever 'store'
# is called on us. Because they are called first, the token will already
# be stored there, and so we can successfully erase it to prevent caching.
if (platforms.OperatingSystem.Current() ==
platforms.OperatingSystem.MACOSX):
log.debug('Clearing OSX credential cache.')
try:
input_string = 'protocol={protocol}\nhost={host}\n\n'.format(
protocol=info.get('protocol'), host=info.get('host'))
log.debug('Calling erase with input:\n%s', input_string)
p = subprocess.Popen(['git-credential-osxkeychain', 'erase'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(out, err) = p.communicate(input_string)
if p.returncode:
log.debug(
'Failed to clear OSX keychain:\nstdout: {%s}\nstderr: {%s}',
out, err)
# pylint:disable=broad-except, This can fail and should only be done as
# best effort.
except Exception as e:
log.debug('Failed to clear OSX keychain', exc_info=True)
def _ParseInput(self):
"""Parse the fields from stdin.
Returns:
{str: str}, The parsed parameters given on stdin.
"""
info = {}
for line in sys.stdin:
if _BLANK_LINE_RE.match(line):
continue
match = _KEYVAL_RE.match(line)
if not match:
raise auth_exceptions.GitCredentialHelperError(
'Invalid input line format: [{format}].'
.format(format=line.rstrip('\n')))
key, val = match.groups()
info[key] = val.strip()
if 'protocol' not in info:
raise auth_exceptions.GitCredentialHelperError(
'Required key "protocol" missing.')
if 'host' not in info:
raise auth_exceptions.GitCredentialHelperError(
'Required key "host" missing.')
if info.get('protocol') != 'https':
raise auth_exceptions.GitCredentialHelperError(
'Invalid protocol [{p}]. "https" expected.'
.format(p=info.get('protocol')))
return info
def _CheckNetrc(self):
"""Warn on stderr if ~/.netrc contains redundant credentials."""
def Check(p):
"""Warn about other credential helpers that will be ignored."""
if not os.path.exists(p):
return
try:
data = files.ReadFileContents(p)
if 'source.developers.google.com' in data:
sys.stderr.write(textwrap.dedent("""\
You have credentials for your Google repository in [{path}]. This repository's
git credential helper is set correctly, so the credentials in [{path}] will not
be used, but you may want to remove them to avoid confusion.
""".format(path=p)))
# pylint:disable=broad-except, If something went wrong, forget about it.
except Exception:
pass
Check(files.ExpandHomeDir(os.path.join('~', '.netrc')))
Check(files.ExpandHomeDir(os.path.join('~', '_netrc')))