summaryrefslogtreecommitdiffstats
path: root/awx/lib/site-packages/celery/loaders/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'awx/lib/site-packages/celery/loaders/base.py')
-rw-r--r--awx/lib/site-packages/celery/loaders/base.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/awx/lib/site-packages/celery/loaders/base.py b/awx/lib/site-packages/celery/loaders/base.py
new file mode 100644
index 0000000000..1c3abd467a
--- /dev/null
+++ b/awx/lib/site-packages/celery/loaders/base.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+"""
+ celery.loaders.base
+ ~~~~~~~~~~~~~~~~~~~
+
+ Loader base class.
+
+"""
+from __future__ import absolute_import
+
+import anyjson
+import importlib
+import os
+import re
+import sys
+
+from datetime import datetime
+
+from kombu.utils import cached_property
+from kombu.utils.encoding import safe_str
+
+from celery.datastructures import DictAttribute
+from celery.exceptions import ImproperlyConfigured
+from celery.utils.imports import (
+ import_from_cwd, symbol_by_name, NotAPackage, find_module,
+)
+from celery.utils.functional import maybe_list
+
+ERROR_ENVVAR_NOT_SET = """\
+The environment variable %r is not set,
+and as such the configuration could not be loaded.
+Please set this variable and make it point to
+a configuration module."""
+
+CONFIG_INVALID_NAME = """
+Error: Module '%(module)s' doesn't exist, or it's not a valid \
+Python module name.
+"""
+
+CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
+Did you mean '%(suggest)s'?
+"""
+
+
+class BaseLoader(object):
+ """The base class for loaders.
+
+ Loaders handles,
+
+ * Reading celery client/worker configurations.
+
+ * What happens when a task starts?
+ See :meth:`on_task_init`.
+
+ * What happens when the worker starts?
+ See :meth:`on_worker_init`.
+
+ * What happens when the worker shuts down?
+ See :meth:`on_worker_shutdown`.
+
+ * What modules are imported to find tasks?
+
+ """
+ builtin_modules = frozenset()
+ configured = False
+ error_envvar_not_set = ERROR_ENVVAR_NOT_SET
+ override_backends = {}
+ worker_initialized = False
+
+ _conf = None
+
+ def __init__(self, app=None, **kwargs):
+ from celery.app import app_or_default
+ self.app = app_or_default(app)
+ self.task_modules = set()
+
+ def now(self, utc=True):
+ if utc:
+ return datetime.utcnow()
+ return datetime.now()
+
+ def on_task_init(self, task_id, task):
+ """This method is called before a task is executed."""
+ pass
+
+ def on_process_cleanup(self):
+ """This method is called after a task is executed."""
+ pass
+
+ def on_worker_init(self):
+ """This method is called when the worker (:program:`celery worker`)
+ starts."""
+ pass
+
+ def on_worker_shutdown(self):
+ """This method is called when the worker (:program:`celery worker`)
+ shuts down."""
+ pass
+
+ def on_worker_process_init(self):
+ """This method is called when a child process starts."""
+ pass
+
+ def import_task_module(self, module):
+ self.task_modules.add(module)
+ return self.import_from_cwd(module)
+
+ def import_module(self, module, package=None):
+ return importlib.import_module(module, package=package)
+
+ def import_from_cwd(self, module, imp=None, package=None):
+ return import_from_cwd(
+ module,
+ self.import_module if imp is None else imp,
+ package=package,
+ )
+
+ def import_default_modules(self):
+ return [
+ self.import_task_module(m) for m in (
+ tuple(self.builtin_modules) +
+ tuple(maybe_list(self.app.conf.CELERY_IMPORTS)) +
+ tuple(maybe_list(self.app.conf.CELERY_INCLUDE))
+ )
+ ]
+
+ def init_worker(self):
+ if not self.worker_initialized:
+ self.worker_initialized = True
+ self.import_default_modules()
+ self.on_worker_init()
+
+ def shutdown_worker(self):
+ self.on_worker_shutdown()
+
+ def init_worker_process(self):
+ self.on_worker_process_init()
+
+ def config_from_envvar(self, variable_name, silent=False):
+ module_name = os.environ.get(variable_name)
+ if not module_name:
+ if silent:
+ return False
+ raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
+ return self.config_from_object(module_name, silent=silent)
+
+ def config_from_object(self, obj, silent=False):
+ if isinstance(obj, basestring):
+ try:
+ if '.' in obj:
+ obj = symbol_by_name(obj, imp=self.import_from_cwd)
+ else:
+ obj = self.import_from_cwd(obj)
+ except (ImportError, AttributeError):
+ if silent:
+ return False
+ raise
+ if not hasattr(obj, '__getitem__'):
+ obj = DictAttribute(obj)
+ self._conf = obj
+ return True
+
+ def _import_config_module(self, name):
+ try:
+ self.find_module(name)
+ except NotAPackage:
+ if name.endswith('.py'):
+ raise NotAPackage, NotAPackage(CONFIG_WITH_SUFFIX % {
+ 'module': name, 'suggest': name[:-3]}), sys.exc_info()[2]
+ raise NotAPackage, NotAPackage(
+ CONFIG_INVALID_NAME % {'module': name}), sys.exc_info()[2]
+ else:
+ return self.import_from_cwd(name)
+
+ def find_module(self, module):
+ return find_module(module)
+
+ def cmdline_config_parser(
+ self, args, namespace='celery',
+ re_type=re.compile(r'\((\w+)\)'),
+ extra_types={'json': anyjson.loads},
+ override_types={'tuple': 'json',
+ 'list': 'json',
+ 'dict': 'json'}):
+ from celery.app.defaults import Option, NAMESPACES
+ namespace = namespace.upper()
+ typemap = dict(Option.typemap, **extra_types)
+
+ def getarg(arg):
+ """Parse a single configuration definition from
+ the command line."""
+
+ ## find key/value
+ # ns.key=value|ns_key=value (case insensitive)
+ key, value = arg.split('=', 1)
+ key = key.upper().replace('.', '_')
+
+ ## find namespace.
+ # .key=value|_key=value expands to default namespace.
+ if key[0] == '_':
+ ns, key = namespace, key[1:]
+ else:
+ # find namespace part of key
+ ns, key = key.split('_', 1)
+
+ ns_key = (ns and ns + '_' or '') + key
+
+ # (type)value makes cast to custom type.
+ cast = re_type.match(value)
+ if cast:
+ type_ = cast.groups()[0]
+ type_ = override_types.get(type_, type_)
+ value = value[len(cast.group()):]
+ value = typemap[type_](value)
+ else:
+ try:
+ value = NAMESPACES[ns][key].to_python(value)
+ except ValueError, exc:
+ # display key name in error message.
+ raise ValueError('%r: %s' % (ns_key, exc))
+ return ns_key, value
+ return dict(getarg(v) for v in args)
+
+ def mail_admins(self, subject, body, fail_silently=False,
+ sender=None, to=None, host=None, port=None,
+ user=None, password=None, timeout=None,
+ use_ssl=False, use_tls=False):
+ message = self.mail.Message(sender=sender, to=to,
+ subject=safe_str(subject),
+ body=safe_str(body))
+ mailer = self.mail.Mailer(host=host, port=port,
+ user=user, password=password,
+ timeout=timeout, use_ssl=use_ssl,
+ use_tls=use_tls)
+ mailer.send(message, fail_silently=fail_silently)
+
+ def read_configuration(self):
+ try:
+ custom_config = os.environ['CELERY_CONFIG_MODULE']
+ except KeyError:
+ pass
+ else:
+ usercfg = self._import_config_module(custom_config)
+ return DictAttribute(usercfg)
+ return {}
+
+ @property
+ def conf(self):
+ """Loader configuration."""
+ if self._conf is None:
+ self._conf = self.read_configuration()
+ return self._conf
+
+ @cached_property
+ def mail(self):
+ return self.import_module('celery.utils.mail')