HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/firebase/test/history_picker.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A library to find a Tool Results History to publish results to."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as apitools_exceptions

from googlecloudsdk.api_lib.firebase.test import util
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.core import log


class ToolResultsHistoryPicker(object):
  """Finds a History to publish mobile test results to.
  """

  def __init__(self, project, client, messages):
    """Construct a ToolResultsHistoryPicker.

    Args:
      project: string containing the GCE project id.
      client: ToolResults API client lib generated by apitools.
      messages: ToolResults API message classes generated by apitools.
    """
    self._project = project
    self._client = client
    self._messages = messages

  def _ListHistoriesByName(self, history_name, page_size):
    """Lists histories by name using the Tool Results API.

    Args:
       history_name: string containing the history name.
       page_size: maximum number of histories to return.

    Returns:
      A list of histories matching the name.

    Raises:
      HttpException if the Tool Results service reports a backend error.
    """
    request = self._messages.ToolresultsProjectsHistoriesListRequest(
        projectId=self._project, filterByName=history_name, pageSize=page_size)
    try:
      response = self._client.projects_histories.List(request)
      log.debug('\nToolResultsHistories.List response:\n{0}\n'.format(response))
      return response
    except apitools_exceptions.HttpError as error:
      msg = ('Http error while getting list of Tool Results Histories:\n{0}'
             .format(util.GetError(error)))
      raise exceptions.HttpException(msg)

  def _CreateHistory(self, history_name):
    """Creates a History using the Tool Results API.

    Args:
       history_name: string containing the name of the history to create.

    Returns:
      The history id of the created history.

    Raises:
      HttpException if the Tool Results service reports a backend error.
    """
    history = self._messages.History(name=history_name,
                                     displayName=history_name)
    request = self._messages.ToolresultsProjectsHistoriesCreateRequest(
        projectId=self._project, history=history)
    try:
      response = self._client.projects_histories.Create(request)
      log.debug('\nToolResultsHistories.Create response:\n{0}\n'
                .format(response))
      return response
    except apitools_exceptions.HttpError as error:
      msg = ('Http error while creating a Tool Results History:\n{0}'
             .format(util.GetError(error)))
      raise exceptions.HttpException(msg)

  def GetToolResultsHistoryId(self, history_name):
    """Gets the history id associated with a given history name.

    All the test executions for the same app should be in the same history. This
    method will try to find an existing history for the application or create
    one if this is the first time a particular history_name has been seen.

    Args:
       history_name: string containing the history's name (if the user supplied
         one), else None.

    Returns:
      The id of the history to publish results to.
    """
    if not history_name:
      return None

    # There might be several histories with the same name. We only fetch the
    # first history which is the history with the most recent results.
    histories = self._ListHistoriesByName(history_name, 1).histories
    if histories:
      return histories[0].historyId
    else:
      new_history = self._CreateHistory(history_name)
      return new_history.historyId