HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/third_party/pyu2f/convenience/customauthenticator.py
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Class to offload the end to end flow of U2F signing."""

import base64
import hashlib
import json
import os
import struct
import subprocess
import sys

from pyu2f import errors
from pyu2f import model
from pyu2f.convenience import baseauthenticator

SK_SIGNING_PLUGIN_ENV_VAR = 'SK_SIGNING_PLUGIN'
U2F_SIGNATURE_TIMEOUT_SECONDS = 5

SK_SIGNING_PLUGIN_NO_ERROR = 0
SK_SIGNING_PLUGIN_TOUCH_REQUIRED = 0x6985
SK_SIGNING_PLUGIN_WRONG_DATA = 0x6A80


class CustomAuthenticator(baseauthenticator.BaseAuthenticator):
  """Offloads U2F signing to a pluggable command-line tool.

  Offloads U2F signing to a signing plugin which takes the form of a
  command-line tool. The command-line tool is configurable via the
  SK_SIGNING_PLUGIN environment variable.

  The signing plugin should implement the following interface:

  Communication occurs over stdin/stdout, and messages are both sent and
  received in the form:

  [4 bytes - payload size (little-endian)][variable bytes - json payload]

  Signing Request JSON
  {
    "type": "sign_helper_request",
    "signData": [{
        "keyHandle": <url-safe base64-encoded key handle>,
        "appIdHash": <url-safe base64-encoded SHA-256 hash of application ID>,
        "challengeHash": <url-safe base64-encoded SHA-256 hash of ClientData>,
        "version": U2F protocol version (usually "U2F_V2")
        },...],
    "timeoutSeconds": <security key touch timeout>
  }

  Signing Response JSON
  {
    "type": "sign_helper_reply",
    "code": <result code>.
    "errorDetail": <text description of error>,
    "responseData": {
      "appIdHash": <url-safe base64-encoded SHA-256 hash of application ID>,
      "challengeHash": <url-safe base64-encoded SHA-256 hash of ClientData>,
      "keyHandle": <url-safe base64-encoded key handle>,
      "version": <U2F protocol version>,
      "signatureData": <url-safe base64-encoded signature>
    }
  }

  Possible response error codes are:

    NoError            = 0
    UnknownError       = -127
    TouchRequired      = 0x6985
    WrongData          = 0x6a80
  """

  def __init__(self, origin):
    self.origin = origin

  def Authenticate(self, app_id, challenge_data,
                   print_callback=sys.stderr.write):
    """See base class."""

    # Ensure environment variable is present
    plugin_cmd = os.environ.get(SK_SIGNING_PLUGIN_ENV_VAR)
    if plugin_cmd is None:
      raise errors.PluginError('{} env var is not set'
                               .format(SK_SIGNING_PLUGIN_ENV_VAR))

    # Prepare input to signer
    client_data_map, signing_input = self._BuildPluginRequest(
        app_id, challenge_data, self.origin)

    # Call plugin
    print_callback('Please insert and touch your security key\n')
    response = self._CallPlugin([plugin_cmd], signing_input)

    # Handle response
    key_challenge_pair = (response['keyHandle'], response['challengeHash'])
    client_data_json = client_data_map[key_challenge_pair]
    client_data = client_data_json.encode()
    return self._BuildAuthenticatorResponse(app_id, client_data, response)

  def IsAvailable(self):
    """See base class."""
    return os.environ.get(SK_SIGNING_PLUGIN_ENV_VAR) is not None

  def _BuildPluginRequest(self, app_id, challenge_data, origin):
    """Builds a JSON request in the form that the plugin expects."""
    client_data_map = {}
    encoded_challenges = []
    app_id_hash_encoded = self._Base64Encode(self._SHA256(app_id))
    for challenge_item in challenge_data:
      key = challenge_item['key']
      key_handle_encoded = self._Base64Encode(key.key_handle)

      raw_challenge = challenge_item['challenge']

      client_data_json = model.ClientData(
          model.ClientData.TYP_AUTHENTICATION,
          raw_challenge,
          origin).GetJson()

      challenge_hash_encoded = self._Base64Encode(
          self._SHA256(client_data_json))

      # Populate challenges list
      encoded_challenges.append({
          'appIdHash': app_id_hash_encoded,
          'challengeHash': challenge_hash_encoded,
          'keyHandle': key_handle_encoded,
          'version': key.version,
      })

      # Populate ClientData map
      key_challenge_pair = (key_handle_encoded, challenge_hash_encoded)
      client_data_map[key_challenge_pair] = client_data_json

    signing_request = {
        'type': 'sign_helper_request',
        'signData': encoded_challenges,
        'timeoutSeconds': U2F_SIGNATURE_TIMEOUT_SECONDS,
        'localAlways': True
    }

    return client_data_map, json.dumps(signing_request)

  def _BuildAuthenticatorResponse(self, app_id, client_data, plugin_response):
    """Builds the response to return to the caller."""
    encoded_client_data = self._Base64Encode(client_data)
    signature_data = str(plugin_response['signatureData'])
    key_handle = str(plugin_response['keyHandle'])

    response = {
        'clientData': encoded_client_data,
        'signatureData': signature_data,
        'applicationId': app_id,
        'keyHandle': key_handle,
    }
    return response

  def _CallPlugin(self, cmd, input_json):
    """Calls the plugin and validates the response."""
    # Calculate length of input
    input_length = len(input_json)
    length_bytes_le = struct.pack('<I', input_length)
    request = length_bytes_le + input_json.encode()

    # Call plugin
    sign_process = subprocess.Popen(cmd,
                                    stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE)

    stdout = sign_process.communicate(request)[0]
    exit_status = sign_process.wait()

    # Parse and validate response size
    response_len_le = stdout[:4]
    response_len = struct.unpack('<I', response_len_le)[0]
    response = stdout[4:]
    if response_len != len(response):
      raise errors.PluginError(
          'Plugin response length {} does not match data {} (exit_status={})'
          .format(response_len, len(response), exit_status))

    # Ensure valid json
    try:
      json_response = json.loads(response.decode())
    except ValueError:
      raise errors.PluginError('Plugin returned invalid output (exit_status={})'
                               .format(exit_status))

    # Ensure response type
    if json_response.get('type') != 'sign_helper_reply':
      error_string = 'Plugin returned invalid response type (exit_status={})'.format(exit_status)
      error_detail = json_response.get('errorDetail')
      if (error_detail):
        error_string += '. Additional information:\n' + error_detail
      raise errors.PluginError(error_string)

    # Parse response codes
    result_code = json_response.get('code')
    if result_code is None:
      raise errors.PluginError('Plugin missing result code (exit_status={})'
                               .format(exit_status))

    # Handle errors
    if result_code == SK_SIGNING_PLUGIN_TOUCH_REQUIRED:
      raise errors.U2FError(errors.U2FError.TIMEOUT)
    elif result_code == SK_SIGNING_PLUGIN_WRONG_DATA:
      raise errors.U2FError(errors.U2FError.DEVICE_INELIGIBLE)
    elif result_code != SK_SIGNING_PLUGIN_NO_ERROR:
      raise errors.PluginError(
          'Plugin failed with error {} - {} (exit_status={})'
          .format(result_code,
                  json_response.get('errorDetail'),
                  exit_status))

    # Ensure response data is present
    response_data = json_response.get('responseData')
    if response_data is None:
      raise errors.PluginErrors(
          'Plugin returned output with missing responseData (exit_status={})'
          .format(exit_status))

    return response_data

  def _SHA256(self, string):
    """Helper method to perform SHA256."""
    md = hashlib.sha256()
    md.update(string.encode())
    return md.digest()

  def _Base64Encode(self, bytes_data):
      """Helper method to base64 encode, strip padding, and return str
      result."""
      return base64.urlsafe_b64encode(bytes_data).decode().rstrip('=')