HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/datastream/stream_objects.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cloud Datastream stream objects API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import list_pager
from googlecloudsdk.api_lib.datastream import util


class StreamObjectsClient:
  """Client for stream objects service in the API."""

  def __init__(self, client=None, messages=None):
    self._client = client or util.GetClientInstance()
    self._messages = messages or util.GetMessagesModule()
    self._service = self._client.projects_locations_streams_objects
    self._resource_parser = util.GetResourceParser()

  def List(self, project_id, stream, args):
    """Get the list of objects in a stream.

    Args:
      project_id: The project ID to retrieve
      stream: The stream name to retrieve
      args: parsed command line arguments

    Returns:
      An iterator over all the matching stream objects.
    """
    stream_ref = self._resource_parser.Create(
        'datastream.projects.locations.streams',
        projectsId=project_id,
        streamsId=stream,
        locationsId=args.location)

    list_req_type = self._messages.DatastreamProjectsLocationsStreamsObjectsListRequest
    list_req = list_req_type(parent=stream_ref.RelativeName())

    return list_pager.YieldFromList(
        service=self._service,
        request=list_req,
        limit=args.limit,
        batch_size=args.page_size,
        field='streamObjects',
        batch_size_attribute='pageSize')

  def Lookup(self, project_id, stream_id, args):
    """Lookup a stream object.

    Args:
      project_id:
      stream_id:
      args: argparse.Namespace, The arguments that this command was invoked
        with.

    Returns:
      StreamObject: the looked up stream object.
    """
    object_identifier = self._messages.SourceObjectIdentifier()
    if args.oracle_schema:
      object_identifier.oracleIdentifier = (
          self._messages.OracleObjectIdentifier(
              schema=args.oracle_schema, table=args.oracle_table
          )
      )
    elif args.mysql_database:
      object_identifier.mysqlIdentifier = self._messages.MysqlObjectIdentifier(
          database=args.mysql_database, table=args.mysql_table
      )
    elif args.postgresql_schema:
      object_identifier.postgresqlIdentifier = (
          self._messages.PostgresqlObjectIdentifier(
              schema=args.postgresql_schema, table=args.postgresql_table
          )
      )
    elif args.sqlserver_schema:
      object_identifier.sqlServerIdentifier = (
          self._messages.SqlServerObjectIdentifier(
              schema=args.sqlserver_schema, table=args.sqlserver_table
          )
      )
    elif args.salesforce_object_name:
      object_identifier.salesforceIdentifier = (
          self._messages.SalesforceObjectIdentifier(
              objectName=args.salesforce_object_name
          )
      )
    elif args.mongodb_database:
      object_identifier.mongodbIdentifier = (
          self._messages.MongodbObjectIdentifier(
              database=args.mongodb_database, collection=args.mongodb_collection
          )
      )

    stream_ref = self._resource_parser.Create(
        'datastream.projects.locations.streams',
        projectsId=project_id,
        streamsId=stream_id,
        locationsId=args.location,
    )

    lookup_req_type = (
        self._messages.DatastreamProjectsLocationsStreamsObjectsLookupRequest
    )
    lookup_req = lookup_req_type(
        lookupStreamObjectRequest=self._messages.LookupStreamObjectRequest(
            sourceObjectIdentifier=object_identifier
        ),
        parent=stream_ref.RelativeName(),
    )
    return self._service.Lookup(lookup_req)

  def GetUri(self, name):
    """Get the URL string for a stream object.

    Args:
      name: stream object's full name.

    Returns:
      URL of the stream object resource
    """

    uri = self._resource_parser.ParseRelativeName(
        name, collection='datastream.projects.locations.streams.objects')
    return uri.SelfLink()