File: //snap/google-cloud-cli/394/lib/googlecloudsdk/appengine/tools/cron_xml_parser.py
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*- #
"""Directly processes text of cron.xml.
CronXmlParser is called with an XML string to produce a CronXml object
containing the data from the XML.
CronXmlParser: converts XML to CronXml objct
Cron: describes a single cron specified in cron.xml
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from xml.etree import ElementTree
from googlecloudsdk.appengine.tools import xml_parser_utils
from googlecloudsdk.appengine.tools.app_engine_config_exception import AppEngineConfigException
# pylint: disable=g-import-not-at-top
from googlecloudsdk.appengine._internal import six_subset
# groc depends on antlr3 which is py2-only, so conditionally import based on
# python version. See comments under GrocValidator.Validate for more context.
if six_subset.PY2:
from googlecloudsdk.appengine.googlecron import groc
from googlecloudsdk.appengine.googlecron import groctimespecification
else:
groc = None
groctimespecification = None
# pylint: enable=g-import-not-at-top
_RETRY_PARAMETER_TAGS = ('job-retry-limit',
'job-age-limit',
'min-backoff-seconds',
'max-backoff-seconds',
'max-doublings')
def GetCronYaml(unused_application, cron_xml_str):
return _MakeCronListIntoYaml(CronXmlParser().ProcessXml(cron_xml_str))
def _MakeCronListIntoYaml(cron_list):
"""Converts list of yaml statements describing cron jobs into a string."""
statements = ['cron:']
for cron in cron_list:
statements += cron.ToYaml()
return '\n'.join(statements) + '\n'
def _ProcessRetryParametersNode(node, cron):
"""Converts <retry-parameters> in node to cron.retry_parameters."""
retry_parameters_node = xml_parser_utils.GetChild(node, 'retry-parameters')
if retry_parameters_node is None:
cron.retry_parameters = None
return
retry_parameters = _RetryParameters()
cron.retry_parameters = retry_parameters
for tag in _RETRY_PARAMETER_TAGS:
if xml_parser_utils.GetChild(retry_parameters_node, tag) is not None:
setattr(
retry_parameters,
tag.replace('-', '_'),
xml_parser_utils.GetChildNodeText(retry_parameters_node, tag))
class CronXmlParser(object):
"""Provides logic for walking down XML tree and pulling data."""
def ProcessXml(self, xml_str):
"""Parses XML string and returns object representation of relevant info.
Args:
xml_str: The XML string.
Returns:
A list of Cron objects containing information about cron jobs from the
XML.
Raises:
AppEngineConfigException: In case of malformed XML or illegal inputs.
"""
try:
self.crons = []
self.errors = []
xml_root = ElementTree.fromstring(xml_str)
if xml_root.tag != 'cronentries':
raise AppEngineConfigException('Root tag must be <cronentries>')
for child in list(xml_root):
self.ProcessCronNode(child)
if self.errors:
raise AppEngineConfigException('\n'.join(self.errors))
return self.crons
except ElementTree.ParseError:
raise AppEngineConfigException('Bad input -- not valid XML')
def ProcessCronNode(self, node):
"""Processes XML <cron> nodes into Cron objects.
The following information is parsed out:
description: Describing the purpose of the cron job.
url: The location of the script.
schedule: Written in groc; the schedule according to which the job is
executed.
timezone: The timezone that the schedule runs in.
target: Which version of the app this applies to.
Args:
node: <cron> XML node in cron.xml.
"""
tag = xml_parser_utils.GetTag(node)
if tag != 'cron':
self.errors.append('Unrecognized node: <%s>' % tag)
return
cron = Cron()
cron.url = xml_parser_utils.GetChildNodeText(node, 'url')
cron.timezone = xml_parser_utils.GetChildNodeText(node, 'timezone')
cron.target = xml_parser_utils.GetChildNodeText(node, 'target')
cron.description = xml_parser_utils.GetChildNodeText(node, 'description')
cron.schedule = xml_parser_utils.GetChildNodeText(node, 'schedule')
_ProcessRetryParametersNode(node, cron)
validation_error = self._ValidateCronEntry(cron)
if validation_error:
self.errors.append(validation_error)
else:
self.crons.append(cron)
def _ValidateCronEntry(self, cron):
# TODO(user): consider timezone validation, not currently done by the
# Java SDK.
if not cron.url:
return 'No URL for <cron> entry'
if not cron.schedule:
return "No schedule provided for <cron> entry with URL '%s'" % cron.url
# If we're running on py3 and don't have access to groctimespecification,
# then the server will still do the validation on the schedule property.
if groc and groctimespecification:
try:
groctimespecification.GrocTimeSpecification(cron.schedule)
except groc.GrocException:
return ("Text '%s' in <schedule> node failed to parse,"
' for <cron> entry with url %s.'
% (cron.schedule, cron.url))
class _RetryParameters(object):
"""Object that contains retry xml tags converted to object attributes."""
def GetYamlStatementsList(self):
"""Converts retry parameter fields to a YAML statement list."""
tag_statements = []
field_names = (tag.replace('-', '_') for tag in _RETRY_PARAMETER_TAGS)
for field in field_names:
field_value = getattr(self, field, None)
if field_value:
tag_statements.append(' %s: %s' % (field, field_value))
if not tag_statements:
return [' retry_parameters: {}']
return [' retry_parameters:'] + tag_statements
class Cron(object):
"""Instances contain information about individual cron entries."""
TZ_GMT = 'UTC'
def ToYaml(self):
"""Returns data from Cron object as a list of Yaml statements."""
statements = [
'- url: %s' % self._SanitizeForYaml(self.url),
' schedule: %s' % self._SanitizeForYaml(self.schedule)]
for optional in ('target', 'timezone', 'description'):
field = getattr(self, optional)
if field:
statements.append(' %s: %s' % (optional, self._SanitizeForYaml(field)))
retry_parameters = getattr(self, 'retry_parameters', None)
if retry_parameters:
statements += retry_parameters.GetYamlStatementsList()
return statements
def _SanitizeForYaml(self, field):
return "'%s'" % field.replace('\n', ' ').replace("'", "''")