HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/platform/bq/clients/client_row_access_policy.py
#!/usr/bin/env python
"""The BigQuery CLI row access policy client library."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from typing import Any, Dict, List

# To configure apiclient logging.
from google.api_core import iam

from clients import bigquery_client
from utils import bq_id_utils

# IAM role name that represents being a grantee on a row access policy.
_FILTERED_DATA_VIEWER_ROLE = 'roles/bigquery.filteredDataViewer'


def create_row_access_policy(
    bqclient: bigquery_client.BigqueryClient,
    policy_reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
    grantees: List[str],
    filter_predicate: str,
):
  """Create a row access policy on the given table reference.

  Arguments:
   bqclient: BigQuery client to use for the request.
   policy_reference: Reference to the row access policy to create.
   grantees: Users or groups that can access rows protected by the row access
     policy.
   filter_predicate: A SQL boolean expression that needs to be true for a row to
     be included in the result.

  Returns:
    rowAccessPolicy: The created row access policy defined in
    google3/google/cloud/bigquery/v2/row_access_policy.proto;l=235;rcl=642795091
  """
  row_access_policy = {
      'rowAccessPolicyReference': {
          'projectId': policy_reference.projectId,
          'datasetId': policy_reference.datasetId,
          'tableId': policy_reference.tableId,
          'policyId': policy_reference.policyId,
      },
      'filterPredicate': filter_predicate,
      'grantees': grantees,
  }
  return (
      bqclient.GetRowAccessPoliciesApiClient()
      .rowAccessPolicies()
      .insert(
          projectId=policy_reference.projectId,
          datasetId=policy_reference.datasetId,
          tableId=policy_reference.tableId,
          body=row_access_policy,
      )
      .execute()
  )


def update_row_access_policy(
    bqclient: bigquery_client.BigqueryClient,
    policy_reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
    grantees: List[str],
    filter_predicate: str,
):
  """Update a row access policy on the given table reference.

  Arguments:
   bqclient: BigQuery client to use for the request.
   policy_reference: Reference to the row access policy to update.
   grantees: Users or groups that can access rows protected by the row access
     policy.
   filter_predicate: A SQL boolean expression that needs to be true for a row to
     be included in the result.

  Returns:
    rowAccessPolicy: The updated row access policy defined in
    google3/google/cloud/bigquery/v2/row_access_policy.proto;l=235;rcl=642795091
  """
  row_access_policy = {
      'rowAccessPolicyReference': {
          'projectId': policy_reference.projectId,
          'datasetId': policy_reference.datasetId,
          'tableId': policy_reference.tableId,
          'policyId': policy_reference.policyId,
      },
      'filterPredicate': filter_predicate,
      'grantees': grantees,
  }
  return (
      bqclient.GetRowAccessPoliciesApiClient()
      .rowAccessPolicies()
      .update(
          projectId=policy_reference.projectId,
          datasetId=policy_reference.datasetId,
          tableId=policy_reference.tableId,
          policyId=policy_reference.policyId,
          body=row_access_policy,
      )
      .execute()
  )


def get_row_access_policy(
    bqclient: bigquery_client.BigqueryClient,
    policy_reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
):
  """Get a row access policy on the given table reference."""
  response = _get_row_access_policy_reference(bqclient, policy_reference)

  if 'rowAccessPolicyReference' in response:
    _set_row_access_policy_grantees(bqclient, response)
  return response


def _get_row_access_policy_reference(
    bqclient: bigquery_client.BigqueryClient,
    policy_reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
) -> Dict[str, Any]:
  """Returns the RowAccessPolicyReference for the given row access policy."""
  return (
      bqclient.GetRowAccessPoliciesApiClient()
      .rowAccessPolicies()
      .get(
          projectId=policy_reference.projectId,
          datasetId=policy_reference.datasetId,
          tableId=policy_reference.tableId,
          policyId=policy_reference.policyId,
      )
      .execute()
  )


def delete_row_access_policy(
    bqclient: bigquery_client.BigqueryClient,
    policy_reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
    force: bool = False,
):
  """Delete a row access policy on the given table reference."""
  return (
      bqclient.GetRowAccessPoliciesApiClient()
      .rowAccessPolicies()
      .delete(
          projectId=policy_reference.projectId,
          datasetId=policy_reference.datasetId,
          tableId=policy_reference.tableId,
          policyId=policy_reference.policyId,
          force=force,
      )
      .execute()
  )


def _list_row_access_policies(
    bqclient: bigquery_client.BigqueryClient,
    table_reference: 'bq_id_utils.ApiClientHelper.TableReference',
    page_size: int,
    page_token: str,
) -> Dict[str, List[Any]]:
  """Lists row access policies for the given table reference."""
  return (
      bqclient.GetRowAccessPoliciesApiClient()
      .rowAccessPolicies()
      .list(
          projectId=table_reference.projectId,
          datasetId=table_reference.datasetId,
          tableId=table_reference.tableId,
          pageSize=page_size,
          pageToken=page_token,
      )
      .execute()
  )


def list_row_access_policies_with_grantees(
    bqclient: bigquery_client.BigqueryClient,
    table_reference: 'bq_id_utils.ApiClientHelper.TableReference',
    page_size: int,
    page_token: str,
    max_concurrent_iam_calls: int = 1,
) -> Dict[str, List[Any]]:
  """Lists row access policies for the given table reference.

  Arguments:
    bqclient: BigQuery client to use for the request.
    table_reference: Reference to the table.
    page_size: Number of results to return.
    page_token: Token to retrieve the next page of results.
    max_concurrent_iam_calls: Number of concurrent calls to getIAMPolicy.

  Returns:
    A dict that contains entries:
      'rowAccessPolicies': a list of row access policies, with an additional
        'grantees' field that contains the row access policy grantees.
      'nextPageToken': nextPageToken for the next page, if present.
  """
  response = _list_row_access_policies(
      bqclient=bqclient,
      table_reference=table_reference,
      page_size=page_size,
      page_token=page_token,
  )
  if 'rowAccessPolicies' in response:
    row_access_policies = response['rowAccessPolicies']
    for row_access_policy in row_access_policies:
      _set_row_access_policy_grantees(
          bqclient=bqclient,
          row_access_policy=row_access_policy,
      )
  return response


def _set_row_access_policy_grantees(
    bqclient: bigquery_client.BigqueryClient, row_access_policy
):
  """Sets the grantees on the given Row Access Policy."""
  row_access_policy_ref = (
      bq_id_utils.ApiClientHelper.RowAccessPolicyReference.Create(
          **row_access_policy['rowAccessPolicyReference']
      )
  )
  iam_policy = get_row_access_policy_iam_policy(
      bqclient=bqclient, reference=row_access_policy_ref
  )
  grantees = _get_grantees_from_row_access_policy_iam_policy(iam_policy)
  row_access_policy['grantees'] = grantees


def _get_grantees_from_row_access_policy_iam_policy(iam_policy):
  """Returns the filtered data viewer members of the given IAM policy."""
  bindings = iam_policy.get('bindings')
  if not bindings:
    return []

  filtered_data_viewer_binding = next(
      (
          binding
          for binding in bindings
          if binding.get('role') == _FILTERED_DATA_VIEWER_ROLE
      ),
      None,
  )
  if not filtered_data_viewer_binding:
    return []

  return filtered_data_viewer_binding.get('members', [])


def get_row_access_policy_iam_policy(
    bqclient: bigquery_client.BigqueryClient,
    reference: 'bq_id_utils.ApiClientHelper.RowAccessPolicyReference',
) -> iam.Policy:
  """Gets IAM policy for the given row access policy resource.

  Arguments:
    bqclient: BigQuery client to use for the request.
    reference: the RowAccessPolicyReference for the row access policy resource.

  Returns:
    The IAM policy attached to the given row access policy resource.

  Raises:
    BigqueryTypeError: if reference is not a RowAccessPolicyReference.
  """
  bq_id_utils.typecheck(
      reference,
      bq_id_utils.ApiClientHelper.RowAccessPolicyReference,
      method='get_row_access_policy_iam_policy',
  )
  formatted_resource = (
      'projects/%s/datasets/%s/tables/%s/rowAccessPolicies/%s'
      % (
          reference.projectId,
          reference.datasetId,
          reference.tableId,
          reference.policyId,
      )
  )
  return (
      bqclient.GetIAMPolicyApiClient()
      .rowAccessPolicies()
      .getIamPolicy(resource=formatted_resource)
      .execute()
  )