HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/transfer/jobs_apitools_util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for managng the many transfer job flags.

Tested through surface/transfer/jobs/create_test.py.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import datetime

from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.transfer import creds_util
from googlecloudsdk.command_lib.transfer import jobs_flag_util
from googlecloudsdk.command_lib.transfer import name_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import times

UPDATE_FIELD_MASK = [
    'description',
    'logging_config',
    'notification_config',
    'status',
]
UPDATE_FIELD_MASK_WITH_TRANSFER_SPEC = ','.join(
    UPDATE_FIELD_MASK + ['schedule', 'transfer_spec']
)
UPDATE_FIELD_MASK_WITH_REPLICATION_SPEC = ','.join(
    UPDATE_FIELD_MASK + ['replication_spec']
)

COMMON_VALID_TRANSFER_SCHEMES = (
    storage_url.ProviderPrefix.POSIX,
    storage_url.ProviderPrefix.GCS,
    storage_url.ProviderPrefix.S3,
    storage_url.ProviderPrefix.HTTP,
    storage_url.ProviderPrefix.HTTPS,
)

VALID_SOURCE_TRANSFER_SCHEMES = COMMON_VALID_TRANSFER_SCHEMES + (
    storage_url.ProviderPrefix.HDFS,
)
VALID_DESTINATION_TRANSFER_SCHEMES = COMMON_VALID_TRANSFER_SCHEMES
VALID_REPLICATON_SCHEMES = [storage_url.ProviderPrefix.GCS]


def _prompt_user_and_add_valid_scheme(url, valid_schemes):
  """Has user select a valid scheme from a list and returns new URL."""
  # Prompt user if the provided URL lacks a scheme.
  if url.scheme is storage_url.ProviderPrefix.FILE:
    if not console_io.CanPrompt():
      raise errors.InvalidUrlError(
          'Did you mean "posix://{}"'.format(url.resource_name)
      )
    scheme_index = console_io.PromptChoice(
        [scheme.value + '://' for scheme in valid_schemes],
        cancel_option=True,
        message=(
            'Storage Transfer does not support direct file URLs: {}\n'
            'Did you mean to use "posix://"?\n'
            'Run this command with "--help" for more info,\n'
            'or select a valid scheme below.'
        ).format(url),
    )

    new_scheme = valid_schemes[scheme_index]
    return storage_url.switch_scheme(url, new_scheme)
  return url


def add_source_url(specs, args, messages, source_url):
  """Adds source url to transfer or replication spec.

  Args:
    specs:
      a submessage, must be one of [job.transferSpec, job.replicationSpec].
    args: argparse.namespace, the parsed arguments from the command line.
    messages: storagetransfer_v1_message instance.
    source_url:
      An instance of the storage_url variable specifying the source
      location for the data transfer.
  """
  if source_url.scheme is storage_url.ProviderPrefix.HDFS:
    specs.hdfsDataSource = messages.HdfsData(
        path=source_url.resource_name)
  elif source_url.scheme is storage_url.ProviderPrefix.POSIX:
    specs.posixDataSource = messages.PosixFilesystem(
        rootDirectory=source_url.resource_name)
  elif source_url.scheme is storage_url.ProviderPrefix.GCS:
    specs.gcsDataSource = messages.GcsData(
        bucketName=source_url.bucket_name,
        path=source_url.resource_name,
    )
  elif source_url.scheme is storage_url.ProviderPrefix.S3:
    if args.source_endpoint:
      specs.awsS3CompatibleDataSource = (
          messages.AwsS3CompatibleData(
              bucketName=source_url.bucket_name,
              endpoint=args.source_endpoint,
              path=source_url.resource_name,
              region=args.source_signing_region,
              s3Metadata=_get_s3_compatible_metadata(args, messages)))
    else:
      specs.awsS3DataSource = messages.AwsS3Data(
          bucketName=source_url.bucket_name,
          path=source_url.resource_name,
      )
  elif isinstance(source_url, storage_url.AzureUrl):
    specs.azureBlobStorageDataSource = (
        messages.AzureBlobStorageData(
            container=source_url.bucket_name,
            path=source_url.resource_name,
            storageAccount=source_url.account,
        ))


def add_destination_url(specs, messages, destination_url):
  """Adds destination url to transfer or replication spec.

  Args:
    specs:
      a submessage, must be one of [job.transferSpec, job.replicationSpec]
    messages: storagetransfer_v1_message instance.
    destination_url:
      An instance of the storage_url variable specifying the destination
      location for the data transfer.
  """
  if destination_url.scheme is storage_url.ProviderPrefix.GCS:
    specs.gcsDataSink = messages.GcsData(
        bucketName=destination_url.bucket_name,
        path=destination_url.resource_name,
    )
  elif destination_url.scheme is storage_url.ProviderPrefix.POSIX:
    specs.posixDataSink = messages.PosixFilesystem(
        rootDirectory=destination_url.resource_name)


def validate_and_add_source_url(
    specs, args, messages, source_url, valid_schemes
):
  """It validates the source url and adds it to transfer or replication spec.

  If no URL scheme is provided, prompt the user to add a valid one
  (e.g., 'gs://').

  Args:
    specs:
      a submessage, must be one of [job.transferSpec, job.replicationSpec].
    args: argparse.namespace, the parsed arguments from the command line.
    messages: storagetransfer_v1_message instance.
    source_url:
      An instance of the storage_url variable specifying the source
      location for the data transfer.
    valid_schemes: the schemes supported by the specs.
  """
  # Prompt user if the provided URL lacks a scheme.
  source_url = _prompt_user_and_add_valid_scheme(source_url, valid_schemes)
  add_source_url(specs, args, messages, source_url)


def validate_and_add_destination_url(
    specs, messages, destination_url, valid_schemes
):
  """Adds destination url to transfer or replication spec.

  If no URL scheme is provided, prompt the user to add a valid one
  (e.g., 'gs://').
  Args:
    specs:
      a submessage, must be one of [job.transferSpec, job.replicationSpec]
    messages: storagetransfer_v1_message instance.
    destination_url:
      An instance of the storage_url variable specifying the destination
      location for the data transfer.
    valid_schemes: the schemes supported by the specs.
  """
  # Prompt user if the provided URL lacks a scheme.
  destination_url = _prompt_user_and_add_valid_scheme(
      destination_url, valid_schemes
  )
  add_destination_url(specs, messages, destination_url)


def _create_or_modify_transfer_options(transfer_spec, args, messages):
  """Creates or modifies TransferOptions object based on args."""
  if not (getattr(args, 'overwrite_when', None) or getattr(
      args, 'delete_from', None) or getattr(args, 'preserve_metadata', None) or
          getattr(args, 'custom_storage_class', None)):
    return
  if not transfer_spec.transferOptions:
    transfer_spec.transferOptions = messages.TransferOptions()

  overwrite_when_argument = getattr(args, 'overwrite_when', None)
  if overwrite_when_argument:
    transfer_spec.transferOptions.overwriteWhen = getattr(
        messages.TransferOptions.OverwriteWhenValueValuesEnum,
        overwrite_when_argument.upper())

  if getattr(args, 'delete_from', None):
    delete_option = jobs_flag_util.DeleteOption(args.delete_from)
    if delete_option is jobs_flag_util.DeleteOption.SOURCE_AFTER_TRANSFER:
      transfer_spec.transferOptions.deleteObjectsFromSourceAfterTransfer = True
    elif delete_option is jobs_flag_util.DeleteOption.DESTINATION_IF_UNIQUE:
      transfer_spec.transferOptions.deleteObjectsUniqueInSink = True

  metadata_options = messages.MetadataOptions()
  if getattr(args, 'preserve_metadata', None):
    for field_value in args.preserve_metadata:
      field_key = jobs_flag_util.PreserveMetadataField(field_value)
      if field_key == jobs_flag_util.PreserveMetadataField.ACL:
        metadata_options.acl = (
            messages.MetadataOptions.AclValueValuesEnum.ACL_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.GID:
        metadata_options.gid = (
            messages.MetadataOptions.GidValueValuesEnum.GID_NUMBER)
      elif field_key == jobs_flag_util.PreserveMetadataField.UID:
        metadata_options.uid = (
            messages.MetadataOptions.UidValueValuesEnum.UID_NUMBER)
      elif field_key == jobs_flag_util.PreserveMetadataField.KMS_KEY:
        metadata_options.kmsKey = (
            messages.MetadataOptions.KmsKeyValueValuesEnum.KMS_KEY_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.MODE:
        metadata_options.mode = (
            messages.MetadataOptions.ModeValueValuesEnum.MODE_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.STORAGE_CLASS:
        metadata_options.storageClass = (
            messages.MetadataOptions.StorageClassValueValuesEnum
            .STORAGE_CLASS_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.SYMLINK:
        metadata_options.symlink = (
            messages.MetadataOptions.SymlinkValueValuesEnum.SYMLINK_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.TEMPORARY_HOLD:
        metadata_options.temporaryHold = (
            messages.MetadataOptions.TemporaryHoldValueValuesEnum
            .TEMPORARY_HOLD_PRESERVE)
      elif field_key == jobs_flag_util.PreserveMetadataField.TIME_CREATED:
        metadata_options.timeCreated = (
            messages.MetadataOptions.TimeCreatedValueValuesEnum.TIME_CREATED_PRESERVE_AS_CUSTOM_TIME
        )
  if getattr(args, 'custom_storage_class', None):
    metadata_options.storageClass = getattr(
        messages.MetadataOptions.StorageClassValueValuesEnum,
        'STORAGE_CLASS_' + args.custom_storage_class.upper())

  if metadata_options != messages.MetadataOptions():
    transfer_spec.transferOptions.metadataOptions = metadata_options


def _create_or_modify_object_conditions(transfer_spec, args, messages):
  """Creates or modifies ObjectConditions based on args."""
  if not (getattr(args, 'include_prefixes', None) or
          getattr(args, 'exclude_prefixes', None) or
          getattr(args, 'include_modified_before_absolute', None) or
          getattr(args, 'include_modified_after_absolute', None) or
          getattr(args, 'include_modified_before_relative', None) or
          getattr(args, 'include_modified_after_relative', None)):
    return
  if not transfer_spec.objectConditions:
    transfer_spec.objectConditions = messages.ObjectConditions()

  if getattr(args, 'include_prefixes', None):
    transfer_spec.objectConditions.includePrefixes = args.include_prefixes
  if getattr(args, 'exclude_prefixes', None):
    transfer_spec.objectConditions.excludePrefixes = args.exclude_prefixes
  if getattr(args, 'include_modified_before_absolute', None):
    modified_before_datetime_string = (
        args.include_modified_before_absolute.astimezone(times.UTC).isoformat())
    transfer_spec.objectConditions.lastModifiedBefore = modified_before_datetime_string
  if getattr(args, 'include_modified_after_absolute', None):
    modified_after_datetime_string = (
        args.include_modified_after_absolute.astimezone(times.UTC).isoformat())
    transfer_spec.objectConditions.lastModifiedSince = modified_after_datetime_string
  if getattr(args, 'include_modified_before_relative', None):
    transfer_spec.objectConditions.minTimeElapsedSinceLastModification = '{}s'.format(
        args.include_modified_before_relative)
  if getattr(args, 'include_modified_after_relative', None):
    transfer_spec.objectConditions.maxTimeElapsedSinceLastModification = '{}s'.format(
        args.include_modified_after_relative)


def _create_or_modify_creds(transfer_spec, args, messages):
  """Creates or modifies TransferSpec source creds based on args."""
  if transfer_spec.awsS3DataSource:
    if getattr(args, 'source_creds_file', None):
      access_key_id, secret_access_key, role_arn = (
          creds_util.get_aws_creds_from_file(args.source_creds_file))
    else:
      log.warning('No --source-creds-file flag. Checking system config files'
                  ' for AWS credentials.')
      access_key_id, secret_access_key = creds_util.get_default_aws_creds()
      role_arn = None

    if not ((access_key_id and secret_access_key) or role_arn):
      log.warning('Missing AWS source creds.')

    transfer_spec.awsS3DataSource.awsAccessKey = messages.AwsAccessKey(
        accessKeyId=access_key_id, secretAccessKey=secret_access_key)
    transfer_spec.awsS3DataSource.roleArn = role_arn

  elif transfer_spec.azureBlobStorageDataSource:
    if getattr(args, 'source_creds_file', None):
      sas_token = creds_util.get_values_for_keys_from_file(
          args.source_creds_file, ['sasToken'])['sasToken']
    else:
      log.warning('No Azure source creds set. Consider adding'
                  ' --source-creds-file flag.')
      sas_token = None
    transfer_spec.azureBlobStorageDataSource.azureCredentials = (
        messages.AzureCredentials(sasToken=sas_token))


def _get_s3_compatible_metadata(args, messages):
  """Generates advanced settings for S3-compatible providers."""
  if not (args.source_auth_method or args.source_list_api or
          args.source_network_protocol or args.source_request_model):
    return None
  s3_compatible_metadata = messages.S3CompatibleMetadata()
  if args.source_auth_method:
    s3_compatible_metadata.authMethod = getattr(
        messages.S3CompatibleMetadata.AuthMethodValueValuesEnum,
        'AUTH_METHOD_' + args.source_auth_method)
  if args.source_list_api:
    s3_compatible_metadata.listApi = getattr(
        messages.S3CompatibleMetadata.ListApiValueValuesEnum,
        args.source_list_api)
  if args.source_network_protocol:
    s3_compatible_metadata.protocol = getattr(
        messages.S3CompatibleMetadata.ProtocolValueValuesEnum,
        'NETWORK_PROTOCOL_' + args.source_network_protocol)
  if args.source_request_model:
    s3_compatible_metadata.requestModel = getattr(
        messages.S3CompatibleMetadata.RequestModelValueValuesEnum,
        'REQUEST_MODEL_' + args.source_request_model)

  return s3_compatible_metadata


def _add_additional_s3_source_options(transfer_spec, args):
  """Adds additional options for S3 source."""
  if args.s3_cloudfront_domain:
    transfer_spec.awsS3DataSource.cloudfrontDomain = args.s3_cloudfront_domain


def _create_or_modify_transfer_spec(job, args, messages):
  """Creates or modifies TransferSpec based on args."""
  if not job.transferSpec:
    job.transferSpec = messages.TransferSpec()

  if getattr(args, 'source', None):
    # Clear any existing source to make space for new one.
    job.transferSpec.httpDataSource = None
    job.transferSpec.posixDataSource = None
    job.transferSpec.gcsDataSource = None
    job.transferSpec.awsS3CompatibleDataSource = None
    job.transferSpec.awsS3DataSource = None
    job.transferSpec.azureBlobStorageDataSource = None
    job.transferSpec.hdfsDataSource = None

    try:
      source_url = storage_url.storage_url_from_string(args.source)
    except errors.InvalidUrlError:
      if args.source.startswith(storage_url.ProviderPrefix.HTTP.value):
        job.transferSpec.httpDataSource = messages.HttpData(listUrl=args.source)
        source_url = None
      else:
        raise
    else:
      validate_and_add_source_url(
          job.transferSpec,
          args,
          messages,
          source_url,
          VALID_SOURCE_TRANSFER_SCHEMES,
      )

  # If additional options are specified for S3 source, add them here.
  if job.transferSpec.awsS3DataSource:
    _add_additional_s3_source_options(job.transferSpec, args)

  if getattr(args, 'destination', None):
    # Clear any existing destination to make space for new one.
    job.transferSpec.posixDataSink = None
    job.transferSpec.gcsDataSink = None

    destination_url = storage_url.storage_url_from_string(args.destination)
    validate_and_add_destination_url(
        job.transferSpec,
        messages,
        destination_url,
        VALID_DESTINATION_TRANSFER_SCHEMES,
    )

  if getattr(args, 'destination_agent_pool', None):
    job.transferSpec.sinkAgentPoolName = name_util.add_agent_pool_prefix(
        args.destination_agent_pool)
  if getattr(args, 'source_agent_pool', None):
    job.transferSpec.sourceAgentPoolName = name_util.add_agent_pool_prefix(
        args.source_agent_pool)
  if getattr(args, 'intermediate_storage_path', None):
    intermediate_storage_url = storage_url.storage_url_from_string(
        args.intermediate_storage_path)
    job.transferSpec.gcsIntermediateDataLocation = messages.GcsData(
        bucketName=intermediate_storage_url.bucket_name,
        path=intermediate_storage_url.resource_name)
  if getattr(args, 'manifest_file', None):
    job.transferSpec.transferManifest = messages.TransferManifest(
        location=args.manifest_file)

  _create_or_modify_creds(job.transferSpec, args, messages)
  _create_or_modify_object_conditions(job.transferSpec, args, messages)
  _create_or_modify_transfer_options(job.transferSpec, args, messages)


def _create_or_modify_event_stream_configuration(job, args, messages):
  """Creates or modifies event stream config. Returns if flag present."""
  event_stream_name = getattr(args, 'event_stream_name', None)
  event_stream_start = getattr(args, 'event_stream_starts', None)
  event_stream_expire = getattr(args, 'event_stream_expires', None)
  if not (event_stream_name or event_stream_start or event_stream_expire):
    # Nothing needs modification.
    return False

  if not job.eventStream:
    job.eventStream = messages.EventStream()
  job.eventStream.name = event_stream_name
  job.eventStream.eventStreamStartTime = event_stream_start
  job.eventStream.eventStreamExpirationTime = event_stream_expire
  return True


def _create_or_modify_schedule(
    job, args, messages, is_update, is_event_driven_transfer=False
):
  """Creates or modifies transfer Schedule object based on args."""
  schedule_starts = getattr(args, 'schedule_starts', None)
  schedule_repeats_every = getattr(args, 'schedule_repeats_every', None)
  schedule_repeats_until = getattr(args, 'schedule_repeats_until', None)
  has_schedule_flag = (
      schedule_starts or schedule_repeats_every or schedule_repeats_until
  )

  if has_schedule_flag:
    if not is_update and args.do_not_run:
      raise ValueError('Cannot set schedule and do-not-run flag.')
  if is_event_driven_transfer and (
      has_schedule_flag or getattr(args, 'do_not_run', False)
  ):
    raise ValueError('Cannot set schedule on event-driven transfer.')

  if (
      (not is_update and args.do_not_run)
      or is_event_driven_transfer
      or (is_update and not has_schedule_flag)
  ):
    # (1) Cannot have schedule for non-running job.
    # (2) Cannot have schedule on event-driven transfer.
    # (3) Nothing needs updating.
    return
  if not job.schedule:
    job.schedule = messages.Schedule()

  if schedule_starts:
    start = schedule_starts.astimezone(times.UTC)

    job.schedule.scheduleStartDate = messages.Date(
        day=start.day,
        month=start.month,
        year=start.year,
    )
    job.schedule.startTimeOfDay = messages.TimeOfDay(
        hours=start.hour,
        minutes=start.minute,
        seconds=start.second,
    )
  elif not is_update:
    # By default, run job immediately on create.
    today_date = datetime.date.today()
    job.schedule.scheduleStartDate = messages.Date(
        day=today_date.day, month=today_date.month, year=today_date.year)

  if schedule_repeats_every:
    job.schedule.repeatInterval = '{}s'.format(schedule_repeats_every)
    # Default behavior of running job every 24 hours if field not set will be
    # blocked by schedule_repeats_until handling.

  if schedule_repeats_until:
    if not job.schedule.repeatInterval:
      raise ValueError(
          'Scheduling a job end time requires setting a frequency with'
          ' --schedule-repeats-every. If no job end time is set, the job will'
          ' run one time.')
    end = schedule_repeats_until.astimezone(times.UTC)
    job.schedule.scheduleEndDate = messages.Date(
        day=end.day,
        month=end.month,
        year=end.year,
    )
    job.schedule.endTimeOfDay = messages.TimeOfDay(
        hours=end.hour,
        minutes=end.minute,
        seconds=end.second,
    )
  elif not is_update and not job.schedule.repeatInterval:
    # By default, run operation once on create.
    # If job frequency set, allow operation to repeat endlessly.
    job.schedule.scheduleEndDate = job.schedule.scheduleStartDate


def _create_or_modify_notification_config(job, args, messages, is_update=False):
  """Creates or modifies transfer NotificationConfig object based on args."""
  notification_pubsub_topic = getattr(args, 'notification_pubsub_topic', None)
  notification_event_types = getattr(args, 'notification_event_types', None)
  notification_payload_format = getattr(args, 'notification_payload_format',
                                        None)
  if not (notification_pubsub_topic or notification_event_types or
          notification_payload_format):
    # Nothing to modify with.
    return

  if notification_pubsub_topic:
    if not job.notificationConfig:
      # Create config with required PubSub topic.
      job.notificationConfig = messages.NotificationConfig(
          pubsubTopic=notification_pubsub_topic)
    else:
      job.notificationConfig.pubsubTopic = notification_pubsub_topic

  if (notification_event_types or
      notification_payload_format) and not job.notificationConfig:
    raise ValueError('Cannot set notification config without'
                     ' --notification-pubsub-topic.')

  if notification_payload_format:
    payload_format_key = notification_payload_format.upper()
    job.notificationConfig.payloadFormat = getattr(
        messages.NotificationConfig.PayloadFormatValueValuesEnum,
        payload_format_key)
  elif not is_update:
    # New job default.
    job.notificationConfig.payloadFormat = (
        messages.NotificationConfig.PayloadFormatValueValuesEnum.JSON)

  if notification_event_types:
    event_types = []
    for event_type_arg in notification_event_types:
      event_type_key = 'TRANSFER_OPERATION_' + event_type_arg.upper()
      event_type = getattr(
          messages.NotificationConfig.EventTypesValueListEntryValuesEnum,
          event_type_key)
      event_types.append(event_type)
    job.notificationConfig.eventTypes = event_types
  elif not is_update:
    # New job default.
    job.notificationConfig.eventTypes = [
        (messages.NotificationConfig.EventTypesValueListEntryValuesEnum
         .TRANSFER_OPERATION_SUCCESS),
        (messages.NotificationConfig.EventTypesValueListEntryValuesEnum
         .TRANSFER_OPERATION_FAILED),
        (messages.NotificationConfig.EventTypesValueListEntryValuesEnum
         .TRANSFER_OPERATION_ABORTED)
    ]


def _enable_onprem_gcs_transfer_logs(job, args, is_update):
  """Sets enableOnpremGcsTransferLogs boolean."""
  enable_posix_transfer_logs = getattr(args, 'enable_posix_transfer_logs', None)
  # GCS transfer logs only supported for POSIX.
  if job.replicationSpec or not (
      job.transferSpec.posixDataSource or job.transferSpec.posixDataSink
  ):
    job.loggingConfig.enableOnpremGcsTransferLogs = False
  # Caller has specifically enabled or disabled logs.
  elif enable_posix_transfer_logs is not None:
    job.loggingConfig.enableOnpremGcsTransferLogs = enable_posix_transfer_logs
  # Default to creating new POSIX transfers with GCS transfer logs enabled.
  elif not is_update:
    job.loggingConfig.enableOnpremGcsTransferLogs = True
  # Avoid modifying existing POSIX transfers on UpdateTransferJob.
  else:
    pass
  return


def _create_or_modify_logging_config(job, args, messages, is_update):
  """Creates or modifies transfer LoggingConfig object based on args."""
  if not job.loggingConfig:
    job.loggingConfig = messages.LoggingConfig()
  # TODO(b/322289474): enable-posix-transfer-logs logic can be cleaned up once
  #                    POSIX logs are deprecated.
  _enable_onprem_gcs_transfer_logs(job, args, is_update)

  log_actions = getattr(args, 'log_actions', None)
  log_action_states = getattr(args, 'log_action_states', None)
  if not (log_actions or log_action_states):
    # Nothing remaining to modify with.
    return

  existing_log_actions = job.loggingConfig and job.loggingConfig.logActions
  existing_log_action_states = (
      job.loggingConfig and job.loggingConfig.logActionStates)

  if (not (log_actions and log_action_states) and
      ((log_actions and not existing_log_action_states) or
       (log_action_states and not existing_log_actions))):
    raise ValueError('Both --log-actions and --log-action-states are required'
                     ' for a complete log config.')

  if log_actions:
    actions = []
    for action in log_actions:
      actions.append(
          getattr(job.loggingConfig.LogActionsValueListEntryValuesEnum,
                  action.upper()))
    job.loggingConfig.logActions = actions

  if log_action_states:
    action_states = []
    for action_state in log_action_states:
      action_states.append(
          getattr(job.loggingConfig.LogActionStatesValueListEntryValuesEnum,
                  action_state.upper()))
    job.loggingConfig.logActionStates = action_states


def generate_patch_transfer_job_message(messages, job, field_mask):
  """Generates Apitools patch message for transfer jobs."""
  project_id = job.projectId
  job.projectId = None

  if job.schedule == messages.Schedule():
    # Jobs returned by API are populated with their user-set schedule or an
    # empty schedule. Empty schedules cannot be re-submitted to the API.
    job.schedule = None

  return messages.StoragetransferTransferJobsPatchRequest(
      jobName=job.name,
      updateTransferJobRequest=messages.UpdateTransferJobRequest(
          projectId=project_id,
          transferJob=job,
          updateTransferJobFieldMask=field_mask,
      ))


def _create_or_modify_replication_spec(
    job, args, messages, has_event_stream_flag=False
):
  """Adds/Updates the replication spec to transfer job."""
  if has_event_stream_flag:
    raise ValueError(
        'Not allowed to set event stream flags on replication jobs.'
    )
  if not job.replicationSpec:
    job.replicationSpec = messages.ReplicationSpec()

  if getattr(args, 'source', None):
    # Clear any existing source to make space for new one.
    job.replicationSpec.gcsDataSource = None

    source_url = storage_url.storage_url_from_string(args.source)
    if source_url.scheme not in VALID_REPLICATON_SCHEMES:
      raise errors.Error(
          'Replication feature is currently available for Google Cloud Storage'
          ' buckets only.'
      )
    validate_and_add_source_url(
        job.replicationSpec,
        args,
        messages,
        source_url,
        VALID_REPLICATON_SCHEMES,
    )

  if getattr(args, 'destination', None):
    # Clear any existing destination to make space for new one.
    job.replicationSpec.gcsDataSink = None

    destination_url = storage_url.storage_url_from_string(args.destination)
    if destination_url.scheme not in VALID_REPLICATON_SCHEMES:
      raise errors.Error(
          'Replication feature is currently available for Google Cloud Storage'
          ' buckets only.'
      )
    validate_and_add_destination_url(
        job.replicationSpec, messages, destination_url, VALID_REPLICATON_SCHEMES
    )

  _create_or_modify_object_conditions(job.replicationSpec, args, messages)
  _create_or_modify_transfer_options(job.replicationSpec, args, messages)


def generate_transfer_job_message(args, messages, existing_job=None):
  """Generates Apitools transfer message based on command arguments."""
  if existing_job:
    job = existing_job
  else:
    job = messages.TransferJob()

  if not job.projectId:
    job.projectId = properties.VALUES.core.project.Get()

  if getattr(args, 'name', None):
    job.name = name_util.add_job_prefix(args.name)

  if getattr(args, 'description', None):
    job.description = args.description

  if existing_job:
    # Is job update instead of create.
    if getattr(args, 'status', None):
      status_key = args.status.upper()
      job.status = getattr(
          messages.TransferJob.StatusValueValuesEnum, status_key
      )
  else:
    job.status = messages.TransferJob.StatusValueValuesEnum.ENABLED

  has_event_stream_flag = _create_or_modify_event_stream_configuration(
      job, args, messages
  )

  is_transfer_job = (
      (
          not existing_job and not getattr(args, 'replication', None)
      )  # In case of create, replication flag shouldn't be there.
      or job.transferSpec  # In case of update, transferSpec should exists.
  )
  if is_transfer_job:
    _create_or_modify_transfer_spec(job, args, messages)
  else:
    _create_or_modify_replication_spec(
        job, args, messages, has_event_stream_flag=has_event_stream_flag
    )

  is_event_driven_transfer = job.eventStream or job.replicationSpec
  _create_or_modify_schedule(
      job,
      args,
      messages,
      is_update=bool(existing_job),
      is_event_driven_transfer=is_event_driven_transfer,
  )
  _create_or_modify_notification_config(
      job, args, messages, is_update=bool(existing_job)
  )
  _create_or_modify_logging_config(
      job, args, messages, is_update=bool(existing_job)
  )

  if existing_job:
    update_mask = (
        UPDATE_FIELD_MASK_WITH_TRANSFER_SPEC
        if is_transfer_job
        else UPDATE_FIELD_MASK_WITH_REPLICATION_SPEC
    )
    return generate_patch_transfer_job_message(
        messages,
        job,
        update_mask
    )

  return job