summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRyan Petrello <rpetrell@redhat.com>2017-03-30 20:47:48 +0200
committerRyan Petrello <rpetrell@redhat.com>2017-04-21 21:42:26 +0200
commitba259e0ad42209c039d3ae00075ce5bc6c57c8ba (patch)
treec0ce3577cc8642574d62f0ce0241abe60a5ffa42
parentMerge pull request #6056 from AlanCoding/another_v1 (diff)
downloadawx-ba259e0ad42209c039d3ae00075ce5bc6c57c8ba.tar.xz
awx-ba259e0ad42209c039d3ae00075ce5bc6c57c8ba.zip
Introduce a new CredentialTemplate model
Credentials now have a required CredentialType, which defines inputs (i.e., username, password) and injectors (i.e., assign the username to SOME_ENV_VARIABLE at job runtime) This commit only implements the model changes necessary to support the new inputs model, and includes code for the credential serializer that allows backwards-compatible support for /api/v1/credentials/; tasks.py still needs to be updated to actually respect CredentialType injectors. This change *will* break the UI for credentials (because it needs to be updated to use the new v2 endpoint). see: #5877 see: #5876 see: #5805
-rw-r--r--awx/api/fields.py23
-rw-r--r--awx/api/filters.py15
-rw-r--r--awx/api/serializers.py144
-rw-r--r--awx/api/urls.py11
-rw-r--r--awx/api/versioning.py14
-rw-r--r--awx/api/views.py19
-rw-r--r--awx/main/access.py31
-rw-r--r--awx/main/fields.py242
-rw-r--r--awx/main/management/commands/create_preload_data.py10
-rw-r--r--awx/main/migrations/0039_v320_migrate_credentials_to_credentialtypes.py148
-rw-r--r--awx/main/migrations/_credentialtypes.py5
-rw-r--r--awx/main/models/ad_hoc_commands.py2
-rw-r--r--awx/main/models/base.py16
-rw-r--r--awx/main/models/credential.py963
-rw-r--r--awx/main/models/jobs.py2
-rw-r--r--awx/main/tests/factories/fixtures.py11
-rw-r--r--awx/main/tests/factories/tower.py4
-rw-r--r--awx/main/tests/functional/api/test_credential.py1255
-rw-r--r--awx/main/tests/functional/api/test_credential_type.py205
-rw-r--r--awx/main/tests/functional/api/test_job_runtime_params.py11
-rw-r--r--awx/main/tests/functional/conftest.py39
-rw-r--r--awx/main/tests/functional/test_credential.py289
-rw-r--r--awx/main/tests/functional/test_db_credential.py6
-rw-r--r--awx/main/tests/functional/test_rbac_credential.py10
-rw-r--r--awx/main/tests/functional/test_rbac_job_start.py11
-rw-r--r--awx/main/tests/functional/test_rbac_project.py4
-rw-r--r--awx/main/tests/unit/models/test_workflow_unit.py16
-rw-r--r--awx/main/tests/unit/test_network_credential.py53
-rw-r--r--awx/settings/defaults.py1
-rwxr-xr-xtools/data_generators/rbac_dummy_data_generator.py4
30 files changed, 3100 insertions, 464 deletions
diff --git a/awx/api/fields.py b/awx/api/fields.py
index f1acaa7e72..dd811d81a6 100644
--- a/awx/api/fields.py
+++ b/awx/api/fields.py
@@ -1,13 +1,11 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
-# Django
-from django.utils.encoding import force_text
# Django REST Framework
from rest_framework import serializers
-__all__ = ['BooleanNullField', 'CharNullField', 'ChoiceNullField', 'EncryptedPasswordField', 'VerbatimField']
+__all__ = ['BooleanNullField', 'CharNullField', 'ChoiceNullField', 'VerbatimField']
class NullFieldMixin(object):
@@ -58,25 +56,6 @@ class ChoiceNullField(NullFieldMixin, serializers.ChoiceField):
return super(ChoiceNullField, self).to_internal_value(data or u'')
-class EncryptedPasswordField(CharNullField):
- '''
- Custom field to handle encrypted password values (on credentials).
- '''
-
- def to_internal_value(self, data):
- value = super(EncryptedPasswordField, self).to_internal_value(data or u'')
- # If user submits a value starting with $encrypted$, ignore it.
- if force_text(value).startswith('$encrypted$'):
- raise serializers.SkipField
- return value
-
- def to_representation(self, value):
- # Replace the actual encrypted value with the string $encrypted$.
- if force_text(value).startswith('$encrypted$'):
- return '$encrypted$'
- return value
-
-
class VerbatimField(serializers.Field):
'''
Custom field that passes the value through without changes.
diff --git a/awx/api/filters.py b/awx/api/filters.py
index 77e04e1bd6..b28c311e26 100644
--- a/awx/api/filters.py
+++ b/awx/api/filters.py
@@ -6,7 +6,7 @@ import re
import json
# Django
-from django.core.exceptions import FieldError, ValidationError
+from django.core.exceptions import FieldError, ValidationError, ObjectDoesNotExist
from django.db import models
from django.db.models import Q
from django.db.models.fields import FieldDoesNotExist
@@ -22,6 +22,7 @@ from rest_framework.filters import BaseFilterBackend
# Ansible Tower
from awx.main.utils import get_type_for_model, to_python_boolean
+from awx.main.models.credential import CredentialType
from awx.main.models.rbac import RoleAncestorEntry
@@ -161,6 +162,18 @@ class FieldLookupBackend(BaseFilterBackend):
except UnicodeEncodeError:
raise ValueError("%r is not an allowed field name. Must be ascii encodable." % lookup)
+ # Make legacy v1 Credential fields work for backwards compatability
+ # TODO: remove after API v1 deprecation period
+ if model._meta.object_name == 'Credential' and lookup == 'kind':
+ try:
+ type_ = CredentialType.from_v1_kind(value)
+ if type_ is None:
+ raise ParseError(_('cannot filter on kind %s') % value)
+ value = type_.pk
+ lookup = 'credential_type'
+ except ObjectDoesNotExist as e:
+ raise ParseError(_('cannot filter on kind %s') % value)
+
field, new_lookup = self.get_field_from_lookup(model, lookup)
# Type names are stored without underscores internally, but are presented and
diff --git a/awx/api/serializers.py b/awx/api/serializers.py
index d317b75f65..8ea31741d7 100644
--- a/awx/api/serializers.py
+++ b/awx/api/serializers.py
@@ -48,8 +48,8 @@ from awx.main.utils import (
from awx.main.validators import vars_validate_or_raise
from awx.conf.license import feature_enabled
-from awx.api.versioning import reverse
-from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, EncryptedPasswordField, VerbatimField
+from awx.api.versioning import reverse, get_request_version
+from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, VerbatimField
logger = logging.getLogger('awx.api.serializers')
@@ -243,6 +243,12 @@ class BaseSerializer(serializers.ModelSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
+ @property
+ def version(self):
+ """
+ The request version component of the URL as an integer i.e., 1 or 2
+ """
+ return get_request_version(self.context.get('request'))
def get_type(self, obj):
return get_type_for_model(self.Meta.model)
@@ -309,7 +315,18 @@ class BaseSerializer(serializers.ModelSerializer):
continue
summary_fields[fk] = OrderedDict()
for field in related_fields:
+
fval = getattr(fkval, field, None)
+
+ # TODO: remove when API v1 is removed
+ if all([
+ self.version == 1,
+ 'credential' in fk,
+ field == 'kind',
+ fval == 'machine'
+ ]):
+ fval = 'ssh'
+
if fval is None and field == 'type':
if isinstance(fkval, PolymorphicModel):
fkval = fkval.get_real_instance()
@@ -1819,25 +1836,76 @@ class ResourceAccessListElementSerializer(UserSerializer):
return ret
-class CredentialSerializer(BaseSerializer):
+class CredentialTypeSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']
class Meta:
+ model = CredentialType
+ fields = ('*', 'kind', 'name', 'managed_by_tower', 'inputs',
+ 'injectors')
+
+
+# TODO: remove when API v1 is removed
+@six.add_metaclass(BaseSerializerMetaclass)
+class V1CredentialFields(BaseSerializer):
+
+ class Meta:
model = Credential
fields = ('*', 'kind', 'cloud', 'host', 'username',
'password', 'security_token', 'project', 'domain',
- 'ssh_key_data', 'ssh_key_unlock', 'organization',
- 'become_method', 'become_username', 'become_password',
- 'vault_password', 'subscription', 'tenant', 'secret', 'client',
- 'authorize', 'authorize_password')
+ 'ssh_key_data', 'ssh_key_unlock', 'become_method',
+ 'become_username', 'become_password', 'vault_password',
+ 'subscription', 'tenant', 'secret', 'client', 'authorize',
+ 'authorize_password')
- def build_standard_field(self, field_name, model_field):
- field_class, field_kwargs = super(CredentialSerializer, self).build_standard_field(field_name, model_field)
- if field_name in Credential.PASSWORD_FIELDS:
- field_class = EncryptedPasswordField
- field_kwargs['required'] = False
- field_kwargs['default'] = ''
- return field_class, field_kwargs
+ def build_field(self, field_name, info, model_class, nested_depth):
+ if field_name in V1Credential.FIELDS:
+ return self.build_standard_field(field_name,
+ V1Credential.FIELDS[field_name])
+ return super(V1CredentialFields, self).build_field(field_name, info, model_class, nested_depth)
+
+
+@six.add_metaclass(BaseSerializerMetaclass)
+class V2CredentialFields(BaseSerializer):
+
+ class Meta:
+ model = Credential
+ fields = ('*', 'credential_type', 'inputs')
+
+
+class CredentialSerializer(BaseSerializer):
+ show_capabilities = ['edit', 'delete']
+
+ class Meta:
+ model = Credential
+ fields = ('*', 'organization')
+
+ def get_fields(self):
+ fields = super(CredentialSerializer, self).get_fields()
+
+ # TODO: remove when API v1 is removed
+ if self.version == 1:
+ fields.update(V1CredentialFields().get_fields())
+ else:
+ fields.update(V2CredentialFields().get_fields())
+ return fields
+
+ def to_representation(self, data):
+ value = super(CredentialSerializer, self).to_representation(data)
+
+ # TODO: remove when API v1 is removed
+ if self.version == 1:
+ if value.get('kind') == 'machine':
+ value['kind'] = 'ssh'
+
+ for field in V1Credential.PASSWORD_FIELDS:
+ if field in value and force_text(value[field]).startswith('$encrypted$'):
+ value[field] = '$encrypted$'
+
+ for k, v in value.get('inputs', {}).items():
+ if force_text(v).startswith('$encrypted$'):
+ value['inputs'][k] = '$encrypted$'
+ return value
def get_related(self, obj):
res = super(CredentialSerializer, self).get_related(obj)
@@ -1853,6 +1921,12 @@ class CredentialSerializer(BaseSerializer):
owner_teams = self.reverse('api:credential_owner_teams_list', kwargs={'pk': obj.pk}),
))
+ # TODO: remove when API v1 is removed
+ if self.version > 1:
+ res.update(dict(
+ credential_type = self.reverse('api:credential_type_detail', kwargs={'pk': obj.credential_type.pk}),
+ ))
+
parents = [role for role in obj.admin_role.parents.all() if role.object_id is not None]
if parents:
res.update({parents[0].content_type.name:parents[0].content_object.get_absolute_url(self.context.get('request'))})
@@ -1886,6 +1960,35 @@ class CredentialSerializer(BaseSerializer):
return summary_dict
+ def get_validation_exclusions(self, obj=None):
+ # CredentialType is now part of validation; legacy v1 fields (e.g.,
+ # 'username', 'password') in JSON POST payloads use the
+ # CredentialType's inputs definition to determine their validity
+ ret = super(CredentialSerializer, self).get_validation_exclusions(obj)
+ for field in ('credential_type', 'inputs'):
+ if field in ret:
+ ret.remove(field)
+ return ret
+
+ def to_internal_value(self, data):
+ if 'credential_type' not in data:
+ # If `credential_type` is not provided, assume the payload is a
+ # v1 credential payload that specifies a `kind` and a flat list
+ # of field values
+ #
+ # In this scenario, we should automatically detect the proper
+ # CredentialType based on the provided values
+ kind = data.get('kind', 'ssh')
+ credential_type = CredentialType.from_v1_kind(kind, data)
+ data['credential_type'] = credential_type.pk
+ value = OrderedDict(
+ {'credential_type': credential_type}.items() +
+ super(CredentialSerializer, self).to_internal_value(data).items()
+ )
+ value.pop('kind', None)
+ return value
+ return super(CredentialSerializer, self).to_internal_value(data)
+
class CredentialSerializerCreate(CredentialSerializer):
@@ -1926,7 +2029,20 @@ class CredentialSerializerCreate(CredentialSerializer):
team = validated_data.pop('team', None)
if team:
validated_data['organization'] = team.organization
+
+ # If our payload contains v1 credential fields, translate to the new
+ # model
+ # TODO: remove when API v1 is removed
+ if self.version == 1:
+ for attr in (
+ set(V1Credential.FIELDS) & set(validated_data.keys()) # set intersection
+ ):
+ validated_data.setdefault('inputs', {})
+ value = validated_data.pop(attr)
+ if value:
+ validated_data['inputs'][attr] = value
credential = super(CredentialSerializerCreate, self).create(validated_data)
+
if user:
credential.admin_role.members.add(user)
if team:
diff --git a/awx/api/urls.py b/awx/api/urls.py
index fda53adf1d..eefd4b22f1 100644
--- a/awx/api/urls.py
+++ b/awx/api/urls.py
@@ -164,6 +164,11 @@ inventory_script_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/object_roles/$', 'inventory_script_object_roles_list'),
)
+credential_type_urls = patterns('awx.api.views',
+ url(r'^$', 'credential_type_list'),
+ url(r'^(?P<pk>[0-9]+)/$', 'credential_type_detail'),
+)
+
credential_urls = patterns('awx.api.views',
url(r'^$', 'credential_list'),
url(r'^(?P<pk>[0-9]+)/activity_stream/$', 'credential_activity_stream_list'),
@@ -378,7 +383,13 @@ v1_urls = patterns('awx.api.views',
url(r'^activity_stream/', include(activity_stream_urls)),
)
+v2_urls = patterns('awx.api.views',
+ url(r'^$', 'api_version_root_view'),
+ url(r'^credential_types/', include(credential_type_urls)),
+)
+
urlpatterns = patterns('awx.api.views',
url(r'^$', 'api_root_view'),
+ url(r'^(?P<version>(v2))/', include(v2_urls)),
url(r'^(?P<version>(v1|v2))/', include(v1_urls))
)
diff --git a/awx/api/versioning.py b/awx/api/versioning.py
index 5eeef2d689..9bf281d5ba 100644
--- a/awx/api/versioning.py
+++ b/awx/api/versioning.py
@@ -1,10 +1,22 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
+from django.conf import settings
+
from rest_framework.reverse import reverse as drf_reverse
from rest_framework.versioning import URLPathVersioning as BaseVersioning
+def get_request_version(request):
+ """
+ The API version of a request as an integer i.e., 1 or 2
+ """
+ version = settings.REST_FRAMEWORK['DEFAULT_VERSION']
+ if request and hasattr(request, 'version'):
+ version = request.version
+ return int(version.lstrip('v'))
+
+
def reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra):
if request is None or getattr(request, 'version', None) is None:
# We need the "current request" to determine the correct version to
@@ -13,7 +25,7 @@ def reverse(viewname, args=None, kwargs=None, request=None, format=None, **extra
if kwargs is None:
kwargs = {}
if 'version' not in kwargs:
- kwargs['version'] = 'v2'
+ kwargs['version'] = settings.REST_FRAMEWORK['DEFAULT_VERSION']
return drf_reverse(viewname, args, kwargs, request, format, **extra)
diff --git a/awx/api/views.py b/awx/api/views.py
index 742b5db485..8701c7bab3 100644
--- a/awx/api/views.py
+++ b/awx/api/views.py
@@ -64,7 +64,7 @@ from awx.main.ha import is_ha_environment
from awx.api.authentication import TaskAuthentication, TokenGetAuthentication
from awx.api.generics import get_view_name
from awx.api.generics import * # noqa
-from awx.api.versioning import reverse
+from awx.api.versioning import reverse, get_request_version
from awx.conf.license import get_license, feature_enabled, feature_exists, LicenseForbids
from awx.main.models import * # noqa
from awx.main.utils import * # noqa
@@ -155,7 +155,6 @@ class ApiVersionRootView(APIView):
def get(self, request, format=None):
''' list top level resources '''
-
data = OrderedDict()
data['authtoken'] = reverse('api:auth_token_view', request=request)
data['ping'] = reverse('api:api_v1_ping_view', request=request)
@@ -169,6 +168,8 @@ class ApiVersionRootView(APIView):
data['project_updates'] = reverse('api:project_update_list', request=request)
data['teams'] = reverse('api:team_list', request=request)
data['credentials'] = reverse('api:credential_list', request=request)
+ if get_request_version(request) > 1:
+ data['credential_types'] = reverse('api:credential_type_list', request=request)
data['inventory'] = reverse('api:inventory_list', request=request)
data['inventory_scripts'] = reverse('api:inventory_script_list', request=request)
data['inventory_sources'] = reverse('api:inventory_source_list', request=request)
@@ -1476,6 +1477,20 @@ class UserAccessList(ResourceAccessList):
new_in_300 = True
+class CredentialTypeList(ListCreateAPIView):
+
+ model = CredentialType
+ serializer_class = CredentialTypeSerializer
+ new_in_320 = True
+
+
+class CredentialTypeDetail(RetrieveUpdateDestroyAPIView):
+
+ model = CredentialType
+ serializer_class = CredentialTypeSerializer
+ new_in_320 = True
+
+
class CredentialList(ListCreateAPIView):
model = Credential
diff --git a/awx/main/access.py b/awx/main/access.py
index 7f0d4d258d..4c7dba60fe 100644
--- a/awx/main/access.py
+++ b/awx/main/access.py
@@ -824,6 +824,36 @@ class InventoryUpdateAccess(BaseAccess):
return self.user in obj.inventory_source.inventory.admin_role
+class CredentialTypeAccess(BaseAccess):
+ '''
+ I can see credentials types when:
+ - I'm authenticated
+ I can create when:
+ - I'm a superuser:
+ I can change when:
+ - I'm a superuser and the type is not "managed by Tower"
+ I can change/delete when:
+ - I'm a superuser and the type is not "managed by Tower"
+ '''
+
+ model = CredentialType
+
+ def can_read(self, obj):
+ return True
+
+ def can_use(self, obj):
+ return True
+
+ def can_add(self, data):
+ return self.user.is_superuser
+
+ def can_change(self, obj, data):
+ return self.user.is_superuser and not obj.managed_by_tower
+
+ def can_delete(self, obj):
+ return self.user.is_superuser and not obj.managed_by_tower
+
+
class CredentialAccess(BaseAccess):
'''
I can see credentials when:
@@ -2282,6 +2312,7 @@ register_access(Group, GroupAccess)
register_access(InventorySource, InventorySourceAccess)
register_access(InventoryUpdate, InventoryUpdateAccess)
register_access(Credential, CredentialAccess)
+register_access(CredentialType, CredentialTypeAccess)
register_access(Team, TeamAccess)
register_access(Project, ProjectAccess)
register_access(ProjectUpdate, ProjectUpdateAccess)
diff --git a/awx/main/fields.py b/awx/main/fields.py
index 0cb8b0e1d4..ab4f2cb1c5 100644
--- a/awx/main/fields.py
+++ b/awx/main/fields.py
@@ -2,13 +2,18 @@
# All Rights Reserved.
# Python
+import copy
import json
import re
import sys
import six
from pyparsing import infixNotation, opAssoc, Optional, Literal, CharsNotIn
+from jinja2 import Environment, StrictUndefined
+from jinja2.exceptions import UndefinedError
+
# Django
+from django.core import exceptions as django_exceptions
from django.db.models.signals import (
post_save,
post_delete,
@@ -24,6 +29,10 @@ from django.db.models.fields.related import (
)
from django.utils.encoding import smart_text
from django.db.models import Q
+from django.utils.translation import ugettext_lazy as _
+
+# jsonschema
+from jsonschema import Draft4Validator
# Django-JSONField
from jsonfield import JSONField as upstream_JSONField
@@ -526,3 +535,236 @@ class DynamicFilterField(models.TextField):
raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string)
+class JSONSchemaField(JSONBField):
+ """
+ A JSONB field that self-validates against a defined JSON schema
+ (http://json-schema.org). This base class is intended to be overwritten by
+ defining `self.schema`.
+ """
+
+ # If an empty {} is provided, we still want to perform this schema
+ # validation
+ empty_values=(None, '')
+
+ def get_default(self):
+ return copy.deepcopy(super(JSONBField, self).get_default())
+
+ def schema(self, model_instance):
+ raise NotImplementedError()
+
+ def validate(self, value, model_instance):
+ super(JSONSchemaField, self).validate(value, model_instance)
+ errors = []
+ for error in Draft4Validator(self.schema(model_instance)).iter_errors(value):
+ errors.append(error)
+
+ if errors:
+ raise django_exceptions.ValidationError(
+ [e.message for e in errors],
+ code='invalid',
+ params={'value': value},
+ )
+
+ def get_db_prep_value(self, value, connection, prepared=False):
+ if connection.vendor == 'sqlite':
+ # sqlite (which we use for tests) does not support jsonb;
+ return json.dumps(value)
+ return super(JSONSchemaField, self).get_db_prep_value(
+ value, connection, prepared
+ )
+
+ def from_db_value(self, value, expression, connection, context):
+ # Work around a bug in django-jsonfield
+ # https://bitbucket.org/schinckel/django-jsonfield/issues/57/cannot-use-in-the-same-project-as-djangos
+ if isinstance(value, six.string_types):
+ return json.loads(value)
+ return value
+
+
+class CredentialInputField(JSONSchemaField):
+ """
+ Used to validate JSON for
+ `awx.main.models.credential:Credential().inputs`.
+
+ Input data for credentials is represented as a dictionary e.g.,
+ {'api_token': 'abc123', 'api_secret': 'SECRET'}
+
+ For the data to be valid, the keys of this dictionary should correspond
+ with the field names (and datatypes) defined in the associated
+ CredentialType e.g.,
+
+ {
+ 'fields': [{
+ 'id': 'api_token',
+ 'label': 'API Token',
+ 'type': 'string'
+ }, {
+ 'id': 'api_secret',
+ 'label': 'API Secret',
+ 'type': 'string'
+ }]
+ }
+ """
+
+ def schema(self, model_instance):
+ # determine the defined fields for the associated credential type
+ properties = {}
+ for field in model_instance.credential_type.inputs.get('fields', []):
+ field = field.copy()
+ properties[field.pop('id')] = field
+ return {
+ 'type': 'object',
+ 'properties': properties,
+ 'additionalProperties': False,
+ }
+
+ def validate(self, value, model_instance):
+ super(CredentialInputField, self).validate(
+ value, model_instance
+ )
+
+ errors = []
+ inputs = model_instance.credential_type.inputs
+ for field in inputs.get('required', []):
+ if not value.get(field, None):
+ errors.append(
+ _('%s required for %s credential.') % (
+ field, model_instance.credential_type.name
+ )
+ )
+
+ if errors:
+ raise django_exceptions.ValidationError(
+ errors,
+ code='invalid',
+ params={'value': value},
+ )
+
+
+class CredentialTypeInputField(JSONSchemaField):
+ """
+ Used to validate JSON for
+ `awx.main.models.credential:CredentialType().inputs`.
+ """
+
+ def schema(self, model_instance):
+ return {
+ 'type': 'object',
+ 'additionalProperties': False,
+ 'properties': {
+ 'fields': {
+ 'type': 'array',
+ 'items': {
+ 'type': 'object',
+ 'properties': {
+ 'type': {'enum': ['string', 'number', 'ssh_private_key']},
+ 'choices': {
+ 'type': 'array',
+ 'minItems': 1,
+ 'items': {'type': 'string'},
+ 'uniqueItems': True
+ },
+ 'id': {'type': 'string'},
+ 'label': {'type': 'string'},
+ 'help_text': {'type': 'string'},
+ 'multiline': {'type': 'boolean'},
+ 'secret': {'type': 'boolean'},
+ 'ask_at_runtime': {'type': 'boolean'},
+ },
+ 'additionalProperties': False,
+ 'required': ['id', 'label'],
+ }
+ }
+ }
+ }
+
+
+
+class CredentialTypeInjectorField(JSONSchemaField):
+ """
+ Used to validate JSON for
+ `awx.main.models.credential:CredentialType().injectors`.
+ """
+
+ def schema(self, model_instance):
+ return {
+ 'type': 'object',
+ 'additionalProperties': False,
+ 'properties': {
+ 'file': {
+ 'type': 'object',
+ 'properties': {
+ 'template': {'type': 'string'},
+ },
+ 'additionalProperties': False,
+ 'required': ['template'],
+ },
+ 'ssh': {
+ 'type': 'object',
+ 'properties': {
+ 'private': {'type': 'string'},
+ 'public': {'type': 'string'},
+ },
+ 'additionalProperties': False,
+ 'required': ['public', 'private'],
+ },
+ 'password': {
+ 'type': 'object',
+ 'properties': {
+ 'key': {'type': 'string'},
+ 'value': {'type': 'string'},
+ },
+ 'additionalProperties': False,
+ 'required': ['key', 'value'],
+ },
+ 'env': {
+ 'type': 'object',
+ 'patternProperties': {
+ # http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap08.html
+ # In the shell command language, a word consisting solely
+ # of underscores, digits, and alphabetics from the portable
+ # character set. The first character of a name is not
+ # a digit.
+ '^[a-zA-Z_]+[a-zA-Z0-9_]*$': {'type': 'string'},
+ },
+ 'additionalProperties': False,
+ },
+ 'extra_vars': {
+ 'type': 'object',
+ 'patternProperties': {
+ # http://docs.ansible.com/ansible/playbooks_variables.html#what-makes-a-valid-variable-name
+ '^[a-zA-Z_]+[a-zA-Z0-9_]*$': {'type': 'string'},
+ },
+ 'additionalProperties': False,
+ },
+ },
+ 'additionalProperties': False
+ }
+
+ def validate(self, value, model_instance):
+ super(CredentialTypeInjectorField, self).validate(
+ value, model_instance
+ )
+
+ # make sure the inputs are clean first
+ CredentialTypeInputField().validate(model_instance.inputs, model_instance)
+
+ # In addition to basic schema validation, search the injector fields
+ # for template variables and make sure they match the fields defined in
+ # the inputs
+ valid_namespace = dict(
+ (field, 'EXAMPLE')
+ for field in model_instance.defined_fields
+ )
+ for type_, injector in value.items():
+ for key, tmpl in injector.items():
+ try:
+ Environment(
+ undefined=StrictUndefined
+ ).from_string(tmpl).render(valid_namespace)
+ except UndefinedError as e:
+ raise django_exceptions.ValidationError(
+ _('%s uses an undefined field (%s)') % (key, e),
+ code='invalid',
+ params={'value': value},
+ )
diff --git a/awx/main/management/commands/create_preload_data.py b/awx/main/management/commands/create_preload_data.py
index caeba2c0a3..c2acd7938e 100644
--- a/awx/main/management/commands/create_preload_data.py
+++ b/awx/main/management/commands/create_preload_data.py
@@ -3,7 +3,7 @@
from django.core.management.base import BaseCommand
from crum import impersonate
-from awx.main.models import User, Organization, Project, Inventory, Credential, Host, JobTemplate
+from awx.main.models import User, Organization, Project, Inventory, CredentialType, Credential, Host, JobTemplate
class Command(BaseCommand):
@@ -30,8 +30,12 @@ class Command(BaseCommand):
scm_update_cache_timeout=0,
organization=o)
p.save(skip_update=True)
- c = Credential.objects.create(name='Demo Credential',
- username=superuser.username,
+ ssh_type = CredentialType.from_v1_kind('ssh')
+ c = Credential.objects.create(credential_type=ssh_type,
+ name='Demo Credential',
+ inputs={
+ 'username': superuser.username
+ },
created_by=superuser)
c.admin_role.members.add(superuser)
i = Inventory.objects.create(name='Demo Inventory',
diff --git a/awx/main/migrations/0039_v320_migrate_credentials_to_credentialtypes.py b/awx/main/migrations/0039_v320_migrate_credentials_to_credentialtypes.py
new file mode 100644
index 0000000000..9f0e0ad18c
--- /dev/null
+++ b/awx/main/migrations/0039_v320_migrate_credentials_to_credentialtypes.py
@@ -0,0 +1,148 @@
+# -*- coding: utf-8 -*-
+from __future__ import unicode_literals
+
+from django.db import migrations, models
+import django.db.models.deletion
+from django.conf import settings
+import taggit.managers
+import awx.main.fields
+from awx.main.migrations import _credentialtypes as credentialtypes
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('main', '0038_v320_data_migrations'),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='CredentialType',
+ fields=[
+ ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
+ ('created', models.DateTimeField(default=None, editable=False)),
+ ('modified', models.DateTimeField(default=None, editable=False)),
+ ('description', models.TextField(default=b'', blank=True)),
+ ('name', models.CharField(max_length=512)),
+ ('kind', models.CharField(max_length=32, choices=[(b'machine', 'Machine'), (b'net', 'Network'), (b'scm', 'Source Control'), (b'cloud', 'Cloud')])),
+ ('managed_by_tower', models.BooleanField(default=False, editable=False)),
+ ('inputs', awx.main.fields.CredentialTypeInputField(default={}, blank=True)),
+ ('injectors', awx.main.fields.CredentialTypeInjectorField(default={}, blank=True)),
+ ('created_by', models.ForeignKey(related_name="{u'class': 'credentialtype', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
+ ('modified_by', models.ForeignKey(related_name="{u'class': 'credentialtype', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
+ ('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
+ ],
+ options={
+ 'ordering': ('kind', 'name'),
+ },
+ ),
+ migrations.AlterModelOptions(
+ name='credential',
+ options={'ordering': ('name',)},
+ ),
+ migrations.AddField(
+ model_name='credential',
+ name='inputs',
+ field=awx.main.fields.CredentialInputField(default={}, blank=True),
+ ),
+ migrations.AddField(
+ model_name='credential',
+ name='credential_type',
+ field=models.ForeignKey(related_name='credentials', to='main.CredentialType', null=True),
+ preserve_default=False,
+ ),
+ migrations.AlterUniqueTogether(
+ name='credential',
+ unique_together=set([('organization', 'name', 'credential_type')]),
+ ),
+ migrations.RunPython(credentialtypes.create_tower_managed_credential_types),
+ # MIGRATION TODO: For each credential, look at the columns below to
+ # determine the appropriate CredentialType (and assign it). Additionally,
+ # set `self.input` to the appropriate JSON blob
+ migrations.RemoveField(
+ model_name='credential',
+ name='authorize',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='authorize_password',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='become_method',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='become_password',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='become_username',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='client',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='cloud',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='domain',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='host',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='kind',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='password',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='project',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='secret',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='security_token',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='ssh_key_data',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='ssh_key_unlock',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='subscription',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='tenant',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='username',
+ ),
+ migrations.RemoveField(
+ model_name='credential',
+ name='vault_password',
+ ),
+ migrations.AlterUniqueTogether(
+ name='credentialtype',
+ unique_together=set([('name', 'kind')]),
+ ),
+ # MIGRATION TODO: Once credentials are migrated, alter the credential_type
+ # foreign key to be non-NULLable
+ ]
diff --git a/awx/main/migrations/_credentialtypes.py b/awx/main/migrations/_credentialtypes.py
new file mode 100644
index 0000000000..1e9bfbf0bf
--- /dev/null
+++ b/awx/main/migrations/_credentialtypes.py
@@ -0,0 +1,5 @@
+from awx.main.models import CredentialType
+
+
+def create_tower_managed_credential_types(apps, schema_editor):
+ CredentialType.setup_tower_managed_defaults()
diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py
index 57fb071c53..4dba6fb48a 100644
--- a/awx/main/models/ad_hoc_commands.py
+++ b/awx/main/models/ad_hoc_commands.py
@@ -99,7 +99,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def clean_credential(self):
cred = self.credential
- if cred and cred.kind != 'ssh':
+ if cred and cred.kind != 'machine':
raise ValidationError(
_('You must provide a machine / SSH credential.'),
)
diff --git a/awx/main/models/base.py b/awx/main/models/base.py
index 81e00f92c6..51152aeaaf 100644
--- a/awx/main/models/base.py
+++ b/awx/main/models/base.py
@@ -229,10 +229,8 @@ class PasswordFieldsModel(BaseModel):
setattr(self, field, '')
else:
ask = self._password_field_allows_ask(field)
- encrypted = encrypt_field(self, field, ask)
- setattr(self, field, encrypted)
- if field not in update_fields:
- update_fields.append(field)
+ self.encrypt_field(field, ask)
+ self.mark_field_for_save(update_fields, field)
super(PasswordFieldsModel, self).save(*args, **kwargs)
# After saving a new instance for the first time, set the password
# fields and save again.
@@ -241,9 +239,17 @@ class PasswordFieldsModel(BaseModel):
for field in self.PASSWORD_FIELDS:
saved_value = getattr(self, '_saved_%s' % field, '')
setattr(self, field, saved_value)
- update_fields.append(field)
+ self.mark_field_for_save(update_fields, field)
self.save(update_fields=update_fields)
+ def encrypt_field(self, field, ask):
+ encrypted = encrypt_field(self, field, ask)
+ setattr(self, field, encrypted)
+
+ def mark_field_for_save(self, update_fields, field):
+ if field not in update_fields:
+ update_fields.append(field)
+
class PrimordialModel(CreatedModifiedModel):
'''
diff --git a/awx/main/models/credential.py b/awx/main/models/credential.py
index ab4d5f3050..e16fddb611 100644
--- a/awx/main/models/credential.py
+++ b/awx/main/models/credential.py
@@ -1,5 +1,8 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
+from collections import OrderedDict
+import functools
+import operator
# Django
from django.db import models
@@ -8,8 +11,9 @@ from django.core.exceptions import ValidationError
# AWX
from awx.api.versioning import reverse
-from awx.main.fields import ImplicitRoleField
-from awx.main.constants import CLOUD_PROVIDERS
+from awx.main.fields import (ImplicitRoleField, CredentialInputField,
+ CredentialTypeInputField,
+ CredentialTypeInjectorField)
from awx.main.utils import decrypt_field
from awx.main.validators import validate_ssh_private_key
from awx.main.models.base import * # noqa
@@ -18,8 +22,176 @@ from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
ROLE_SINGLETON_SYSTEM_AUDITOR,
)
+from awx.main.utils import encrypt_field
+
+__all__ = ['Credential', 'CredentialType', 'V1Credential']
+
+
+class V1Credential(object):
+
+ #
+ # API v1 backwards compat; as long as we continue to support the
+ # /api/v1/credentials/ endpoint, we'll keep these definitions around.
+ # The credential serializers are smart enough to detect the request
+ # version and use *these* fields for constructing the serializer if the URL
+ # starts with /api/v1/
+ #
+ PASSWORD_FIELDS = ('password', 'security_token', 'ssh_key_data',
+ 'ssh_key_unlock', 'become_password',
+ 'vault_password', 'secret', 'authorize_password')
+ KIND_CHOICES = [
+ ('ssh', 'Machine'),
+ ('net', 'Network'),
+ ('scm', 'Source Control'),
+ ('aws', 'Amazon Web Services'),
+ ('rax', 'Rackspace'),
+ ('vmware', 'VMware vCenter'),
+ ('satellite6', 'Red Hat Satellite 6'),
+ ('cloudforms', 'Red Hat CloudForms'),
+ ('gce', 'Google Compute Engine'),
+ ('azure', 'Microsoft Azure Classic (deprecated)'),
+ ('azure_rm', 'Microsoft Azure Resource Manager'),
+ ('openstack', 'OpenStack'),
+ ]
+ FIELDS = {
+ 'kind': models.CharField(
+ max_length=32,
+ choices=[
+ (kind[0], _(kind[1]))
+ for kind in KIND_CHOICES
+ ],
+ default='ssh',
+ ),
+ 'cloud': models.BooleanField(
+ default=False,
+ editable=False,
+ ),
+ 'host': models.CharField(
+ blank=True,
+ default='',
+ max_length=1024,
+ verbose_name=_('Host'),
+ help_text=_('The hostname or IP address to use.'),
+ ),
+ 'username': models.CharField(
+ blank=True,
+ default='',
+ max_length=1024,
+ verbose_name=_('Username'),
+ help_text=_('Username for this credential.'),
+ ),
+ 'password': models.CharField(
+ blank=True,
+ default='',
+ max_length=1024,
+ verbose_name=_('Password'),
+ help_text=_('Password for this credential (or "ASK" to prompt the '
+ 'user for machine credentials).'),
+ ),
+ 'security_token': models.CharField(
+ blank=True,
+ default='',
+ max_length=1024,
+ verbose_name=_('Security Token'),
+ help_text=_('Security Token for this credential'),
+ ),
+ 'project': models.CharField(
+ blank=True,
+ default='',
+ max_length=100,
+ verbose_name=_('Project'),
+ help_text=_('The identifier for the project.'),
+ ),
+ 'domain': models.CharField(
+ blank=True,
+ default='',
+ max_length=100,
+ verbose_name=_('Domain'),
+ help_text=_('The identifier for the domain.'),
+ ),
+ 'ssh_key_data': models.TextField(
+ blank=True,
+ default='',
+ verbose_name=_('SSH private key'),
+ help_text=_('RSA or DSA private key to be used instead of password.'),
+ ),
+ 'ssh_key_unlock': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ verbose_name=_('SSH key unlock'),
+ help_text=_('Passphrase to unlock SSH private key if encrypted (or '
+ '"ASK" to prompt the user for machine credentials).'),
+ ),
+ 'become_method': models.CharField(
+ max_length=32,
+ blank=True,
+ default='',
+ choices=[
+ ('', _('None')),
+ ('sudo', _('Sudo')),
+ ('su', _('Su')),
+ ('pbrun', _('Pbrun')),
+ ('pfexec', _('Pfexec')),
+ ('dzdo', _('DZDO')),
+ ('pmrun', _('Pmrun')),
+ ],
+ help_text=_('Privilege escalation method.')
+ ),
+ 'become_username': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Privilege escalation username.'),
+ ),
+ 'become_password': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Password for privilege escalation method.')
+ ),
+ 'vault_password': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Vault password (or "ASK" to prompt the user).'),
+ ),
+ 'authorize': models.BooleanField(
+ default=False,
+ help_text=_('Whether to use the authorize mechanism.'),
+ ),
+ 'authorize_password': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Password used by the authorize mechanism.'),
+ ),
+ 'client': models.CharField(
+ max_length=128,
+ blank=True,
+ default='',
+ help_text=_('Client Id or Application Id for the credential'),
+ ),
+ 'secret': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Secret Token for this credential'),
+ ),
+ 'subscription': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Subscription identifier for this credential'),
+ ),
+ 'tenant': models.CharField(
+ max_length=1024,
+ blank=True,
+ default='',
+ help_text=_('Tenant identifier for this credential'),
+ )
+ }
-__all__ = ['Credential']
class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
@@ -29,39 +201,12 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
If used with sudo, a sudo password should be set if required.
'''
- KIND_CHOICES = [
- ('ssh', _('Machine')),
- ('net', _('Network')),
- ('scm', _('Source Control')),
- ('aws', _('Amazon Web Services')),
- ('rax', _('Rackspace')),
- ('vmware', _('VMware vCenter')),
- ('satellite6', _('Red Hat Satellite 6')),
- ('cloudforms', _('Red Hat CloudForms')),
- ('gce', _('Google Compute Engine')),
- ('azure', _('Microsoft Azure Classic (deprecated)')),
- ('azure_rm', _('Microsoft Azure Resource Manager')),
- ('openstack', _('OpenStack')),
- ]
-
- BECOME_METHOD_CHOICES = [
- ('', _('None')),
- ('sudo', _('Sudo')),
- ('su', _('Su')),
- ('pbrun', _('Pbrun')),
- ('pfexec', _('Pfexec')),
- ('dzdo', _('DZDO')),
- ('pmrun', _('Pmrun')),
- #('runas', _('Runas')),
- ]
-
- PASSWORD_FIELDS = ('password', 'security_token', 'ssh_key_data', 'ssh_key_unlock',
- 'become_password', 'vault_password', 'secret', 'authorize_password')
-
class Meta:
app_label = 'main'
- ordering = ('kind', 'name')
- unique_together = (('organization', 'name', 'kind'),)
+ ordering = ('name',)
+ unique_together = (('organization', 'name', 'credential_type'))
+
+ PASSWORD_FIELDS = ['inputs']
deprecated_user = models.ForeignKey(
'auth.User',
@@ -79,6 +224,14 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
on_delete=models.CASCADE,
related_name='deprecated_credentials',
)
+ credential_type = models.ForeignKey(
+ 'CredentialType',
+ related_name='credentials',
+ help_text=_('Type for this credential. Credential Types define '
+ 'valid fields (e.g,. "username", "password") and their '
+ 'properties (e.g,. "username is required" or "password '
+ 'should be stored with encryption").')
+ )
organization = models.ForeignKey(
'Organization',
null=True,
@@ -87,130 +240,13 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
on_delete=models.CASCADE,
related_name='credentials',
)
- kind = models.CharField(
- max_length=32,
- choices=KIND_CHOICES,
- default='ssh',
- )
- cloud = models.BooleanField(
- default=False,
- editable=False,
- )
- host = models.CharField(
- blank=True,
- default='',
- max_length=1024,
- verbose_name=_('Host'),
- help_text=_('The hostname or IP address to use.'),
- )
- username = models.CharField(
+ inputs = CredentialInputField(
blank=True,
- default='',
- max_length=1024,
- verbose_name=_('Username'),
- help_text=_('Username for this credential.'),
- )
- password = models.CharField(
- blank=True,
- default='',
- max_length=1024,
- verbose_name=_('Password'),
- help_text=_('Password for this credential (or "ASK" to prompt the '
- 'user for machine credentials).'),
- )
- security_token = models.CharField(
- blank=True,
- default='',
- max_length=1024,
- verbose_name=_('Security Token'),
- help_text=_('Security Token for this credential'),
- )
- project = models.CharField(
- blank=True,
- default='',
- max_length=100,
- verbose_name=_('Project'),
- help_text=_('The identifier for the project.'),
- )
- domain = models.CharField(
- blank=True,
- default='',
- max_length=100,
- verbose_name=_('Domain'),
- help_text=_('The identifier for the domain.'),
- )
- ssh_key_data = models.TextField(
- blank=True,
- default='',
- verbose_name=_('SSH private key'),
- help_text=_('RSA or DSA private key to be used instead of password.'),
- )
- ssh_key_unlock = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- verbose_name=_('SSH key unlock'),
- help_text=_('Passphrase to unlock SSH private key if encrypted (or '
- '"ASK" to prompt the user for machine credentials).'),
- )
- become_method = models.CharField(
- max_length=32,
- blank=True,
- default='',
- choices=BECOME_METHOD_CHOICES,
- help_text=_('Privilege escalation method.')
- )
- become_username = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Privilege escalation username.'),
- )
- become_password = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Password for privilege escalation method.')
- )
- vault_password = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Vault password (or "ASK" to prompt the user).'),
- )
- authorize = models.BooleanField(
- default=False,
- help_text=_('Whether to use the authorize mechanism.'),
- )
- authorize_password = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Password used by the authorize mechanism.'),
- )
- client = models.CharField(
- max_length=128,
- blank=True,
- default='',
- help_text=_('Client Id or Application Id for the credential'),
- )
- secret = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Secret Token for this credential'),
- )
- subscription = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Subscription identifier for this credential'),
- )
- tenant = models.CharField(
- max_length=1024,
- blank=True,
- default='',
- help_text=_('Tenant identifier for this credential'),
+ default={},
+ help_text=_('Data structure used to specify input values (e.g., '
+ '{"username": "jane-doe", "password": "secret"}). Valid '
+ 'fields and their requirements vary depending on the '
+ 'fields defined on the chosen CredentialType.')
)
admin_role = ImplicitRoleField(
parent_role=[
@@ -230,9 +266,50 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
'admin_role',
])
+ def __getattr__(self, item):
+ if item in V1Credential.FIELDS:
+ return self.inputs.get(item, V1Credential.FIELDS[item].default)
+ elif item in self.inputs:
+ return self.inputs[item]
+ raise AttributeError(item)
+
+ def __setattr__(self, item, value):
+ if item in V1Credential.FIELDS and item in self.credential_type.defined_fields:
+ if value:
+ self.inputs[item] = value
+ elif item in self.inputs:
+ del self.inputs[item]
+ return
+ super(Credential, self).__setattr__(item, value)
+
+ @property
+ def kind(self):
+ # TODO: remove the need for this helper property by removing its usage
+ # throughout the codebase
+ type_ = self.credential_type
+ if type_.kind != 'cloud':
+ return type_.kind
+ for field in V1Credential.KIND_CHOICES:
+ kind, name = field
+ if name == type_.name:
+ return kind
+
+ @property
+ def cloud(self):
+ return self.credential_type.kind == 'cloud'
+
+ def get_absolute_url(self, request=None):
+ return reverse('api:credential_detail', kwargs={'pk': self.pk}, request=request)
+
+ #
+ # TODO: the SSH-related properties below are largely used for validation
+ # and for determining passwords necessary for job/ad-hoc launch
+ #
+ # These are SSH-specific; should we move them elsewhere?
+ #
@property
def needs_ssh_password(self):
- return self.kind == 'ssh' and self.password == 'ASK'
+ return self.kind == 'machine' and self.password == 'ASK'
@property
def has_encrypted_ssh_key_data(self):
@@ -251,17 +328,17 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
@property
def needs_ssh_key_unlock(self):
- if self.kind == 'ssh' and self.ssh_key_unlock in ('ASK', ''):
+ if self.kind == 'machine' and self.ssh_key_unlock in ('ASK', ''):
return self.has_encrypted_ssh_key_data
return False
@property
def needs_become_password(self):
- return self.kind == 'ssh' and self.become_password == 'ASK'
+ return self.kind == 'machine' and self.become_password == 'ASK'
@property
def needs_vault_password(self):
- return self.kind == 'ssh' and self.vault_password == 'ASK'
+ return self.kind == 'machine' and self.vault_password == 'ASK'
@property
def passwords_needed(self):
@@ -271,54 +348,10 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
needed.append(field)
return needed
- def get_absolute_url(self, request=None):
- return reverse('api:credential_detail', kwargs={'pk': self.pk}, request=request)
-
- def clean_host(self):
- """Ensure that if this is a type of credential that requires a
- `host`, that a host is provided.
- """
- host = self.host or ''
- if not host and self.kind == 'vmware':
- raise ValidationError(_('Host required for VMware credential.'))
- if not host and self.kind == 'openstack':
- raise ValidationError(_('Host required for OpenStack credential.'))
- return host
-
- def clean_domain(self):
- return self.domain or ''
-
- def clean_username(self):
- username = self.username or ''
- if not username and self.kind == 'aws':
- raise ValidationError(_('Access key required for AWS credential.'))
- if not username and self.kind == 'rax':
- raise ValidationError(_('Username required for Rackspace '
- 'credential.'))
- if not username and self.kind == 'vmware':
- raise ValidationError(_('Username required for VMware credential.'))
- if not username and self.kind == 'openstack':
- raise ValidationError(_('Username required for OpenStack credential.'))
- return username
-
- def clean_password(self):
- password = self.password or ''
- if not password and self.kind == 'aws':
- raise ValidationError(_('Secret key required for AWS credential.'))
- if not password and self.kind == 'rax':
- raise ValidationError(_('API key required for Rackspace credential.'))
- if not password and self.kind == 'vmware':
- raise ValidationError(_('Password required for VMware credential.'))
- if not password and self.kind == 'openstack':
- raise ValidationError(_('Password or API key required for OpenStack credential.'))
- return password
-
- def clean_project(self):
- project = self.project or ''
- if self.kind == 'openstack' and not project:
- raise ValidationError(_('Project name required for OpenStack credential.'))
- return project
-
+ #
+ # TODO: all of these required fields should be captured in schema
+ # definitions and these clean_ methods should be removed
+ #
def clean_ssh_key_data(self):
if self.pk:
ssh_key_data = decrypt_field(self, 'ssh_key_data')
@@ -355,9 +388,10 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
raise ValidationError(_('Credential cannot be assigned to both a user and team.'))
def _password_field_allows_ask(self, field):
- return bool(self.kind == 'ssh' and field != 'ssh_key_data')
+ return bool(self.kind == 'machine' and field != 'ssh_key_data')
def save(self, *args, **kwargs):
+ inputs_before = {}
# If update_fields has been specified, add our field names to it,
# if hit hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
@@ -377,10 +411,507 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
self.deprecated_user = None
if 'deprecated_user' not in update_fields:
update_fields.append('deprecated_user')
- # Set cloud flag based on credential kind.
- cloud = self.kind in CLOUD_PROVIDERS + ('aws',)
- if self.cloud != cloud:
- self.cloud = cloud
- if 'cloud' not in update_fields:
- update_fields.append('cloud')
+
+ inputs_before = cred_before.inputs
+
+ self.PASSWORD_FIELDS = self.credential_type.secret_fields
+
+ if self.pk:
+ # Look up the currently persisted value so that we can replace
+ # $encrypted$ with the actual DB-backed value
+ for field in self.PASSWORD_FIELDS:
+ if self.inputs.get(field) == '$encrypted$':
+ self.inputs[field] = inputs_before[field]
+
super(Credential, self).save(*args, **kwargs)
+
+ def encrypt_field(self, field, ask):
+ encrypted = encrypt_field(self, field, ask=ask)
+ if encrypted:
+ self.inputs[field] = encrypted
+ elif field in self.inputs:
+ del self.inputs[field]
+
+ def mark_field_for_save(self, update_fields, field):
+ if field in self.credential_type.secret_fields:
+ # If we've encrypted a v1 field, we actually want to persist
+ # self.inputs
+ field = 'inputs'
+ super(Credential, self).mark_field_for_save(update_fields, field)
+
+
+class CredentialType(CommonModelNameNotUnique):
+ '''
+ A reusable schema for a credential.
+
+ Used to define a named credential type with fields (e.g., an API key) and
+ output injectors (i.e., an environment variable that uses the API key).
+ '''
+
+ defaults = OrderedDict()
+
+ class Meta:
+ app_label = 'main'
+ ordering = ('kind', 'name')
+ unique_together = (('name', 'kind'),)
+
+ KIND_CHOICES = (
+ ('machine', _('Machine')),
+ ('net', _('Network')),
+ ('scm', _('Source Control')),
+ ('cloud', _('Cloud'))
+ )
+
+ kind = models.CharField(
+ max_length=32,
+ choices=KIND_CHOICES
+ )
+ managed_by_tower = models.BooleanField(
+ default=False,
+ editable=False
+ )
+ inputs = CredentialTypeInputField(
+ blank=True,
+ default={}
+ )
+ injectors = CredentialTypeInjectorField(
+ blank=True,
+ default={}
+ )
+
+ def get_absolute_url(self, request=None):
+ return reverse('api:credential_type_detail', kwargs={'pk': self.pk}, request=request)
+
+ @property
+ def unique_by_kind(self):
+ return self.kind != 'cloud'
+
+ @property
+ def defined_fields(self):
+ return [field.get('id') for field in self.inputs.get('fields', [])]
+
+ @property
+ def secret_fields(self):
+ return [
+ field['id'] for field in self.inputs.get('fields', [])
+ if field.get('secret', False) is True
+ ]
+
+ @classmethod
+ def default(cls, f):
+ func = functools.partial(f, cls)
+ cls.defaults[f.__name__] = func
+ return func
+
+ @classmethod
+ def setup_tower_managed_defaults(cls, persisted=True):
+ for default in cls.defaults.values():
+ default_ = default()
+ if persisted:
+ default_.save()
+
+ @classmethod
+ def from_v1_kind(cls, kind, data={}):
+ match = None
+ kind = kind or 'ssh'
+ kind_choices = dict(V1Credential.KIND_CHOICES)
+ requirements = {}
+ if kind == 'ssh':
+ if 'vault_password' in data:
+ requirements.update(dict(
+ kind='machine',
+ name='Vault'
+ ))
+ else:
+ requirements.update(dict(
+ kind='machine',
+ name='SSH'
+ ))
+ elif kind in ('net', 'scm'):
+ requirements['kind'] = kind
+ elif kind in kind_choices:
+ requirements.update(dict(
+ kind='cloud',
+ name=kind_choices[kind]
+ ))
+ if requirements:
+ requirements['managed_by_tower'] = True
+ match = cls.objects.filter(**requirements)[:1].get()
+ return match
+
+
+@CredentialType.default
+def ssh(cls):
+ return cls(
+ kind='machine',
+ name='SSH',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }, {
+ 'id': 'ssh_key_data',
+ 'label': 'SSH Private Key',
+ 'type': 'string',
+ 'secret': True,
+ 'multiline': True
+ }, {
+ 'id': 'ssh_key_unlock',
+ 'label': 'Private Key Passphrase',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }, {
+ 'id': 'become_method',
+ 'label': 'Privilege Escalation Method',
+ 'choices': map(operator.itemgetter(0),
+ V1Credential.FIELDS['become_method'].choices)
+ }, {
+ 'id': 'become_username',
+ 'label': 'Privilege Escalation Username',
+ 'type': 'string',
+ }, {
+ 'id': 'become_password',
+ 'label': 'Privilege Escalation Password',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def scm(cls):
+ return cls(
+ kind='scm',
+ name='Source Control',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True
+ }, {
+ 'id': 'ssh_key_data',
+ 'label': 'SCM Private Key',
+ 'type': 'string',
+ 'secret': True,
+ 'multiline': True
+ }, {
+ 'id': 'ssh_key_unlock',
+ 'label': 'Private Key Passphrase',
+ 'type': 'string',
+ 'secret': True
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def vault(cls):
+ return cls(
+ kind='machine',
+ name='Vault',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'vault_password',
+ 'label': 'Vault Password',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }],
+ }
+ )
+
+
+@CredentialType.default
+def net(cls):
+ return cls(
+ kind='net',
+ name='Network',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'ssh_key_data',
+ 'label': 'SSH Private Key',
+ 'type': 'string',
+ 'secret': True,
+ 'multiline': True
+ }, {
+ 'id': 'ssh_key_unlock',
+ 'label': 'Private Key Passphrase',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'authorize_password',
+ 'label': 'Authorize Password',
+ 'type': 'string',
+ 'secret': True,
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def aws(cls):
+ return cls(
+ kind='cloud',
+ name='Amazon Web Services',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'security_token',
+ 'label': 'STS Token',
+ 'type': 'string',
+ 'secret': True,
+ }],
+ 'required': ['username', 'password']
+ }
+ )
+
+
+@CredentialType.default
+def openstack(cls):
+ return cls(
+ kind='cloud',
+ name='OpenStack',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'host',
+ 'label': 'Host',
+ 'type': 'string',
+ }, {
+ 'id': 'project',
+ 'label': 'Project',
+ 'type': 'string',
+ }],
+ 'required': ['username', 'password', 'host', 'project']
+ }
+ )
+
+
+@CredentialType.default
+def rackspace(cls):
+ return cls(
+ kind='cloud',
+ name='Rackspace',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }],
+ 'required': ['username', 'password']
+ }
+ )
+
+
+@CredentialType.default
+def vmware(cls):
+ return cls(
+ kind='cloud',
+ name='VMware vCenter',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'host',
+ 'label': 'VCenter Host',
+ 'type': 'string',
+ }, {
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }],
+ 'required': ['host', 'username', 'password']
+ }
+ )
+
+
+@CredentialType.default
+def satellite6(cls):
+ return cls(
+ kind='cloud',
+ name='Red Hat Satellite 6',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'host',
+ 'label': 'Satellite 6 URL',
+ 'type': 'string',
+ }, {
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def cloudforms(cls):
+ return cls(
+ kind='cloud',
+ name='Red Hat CloudForms',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'host',
+ 'label': 'CloudForms URL',
+ 'type': 'string',
+ }, {
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def gce(cls):
+ return cls(
+ kind='cloud',
+ name='Google Compute Engine',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Service Account Email Address',
+ 'type': 'string'
+ }, {
+ 'id': 'project',
+ 'label': 'Project',
+ 'type': 'string'
+ }, {
+ 'id': 'ssh_key_data',
+ 'label': 'RSA Private Key',
+ 'type': 'string',
+ 'secret': True,
+ 'multiline': True
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def azure(cls):
+ return cls(
+ kind='cloud',
+ name='Microsoft Azure Classic (deprecated)',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Subscription ID',
+ 'type': 'string'
+ }, {
+ 'id': 'ssh_key_data',
+ 'label': 'Management Certificate',
+ 'type': 'string',
+ 'secret': True,
+ 'multiline': True
+ }]
+ }
+ )
+
+
+@CredentialType.default
+def azure_rm(cls):
+ return cls(
+ kind='cloud',
+ name='Microsoft Azure Resource Manager',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'subscription',
+ 'label': 'Subscription ID',
+ 'type': 'string'
+ }, {
+ 'id': 'username',
+ 'label': 'Username',
+ 'type': 'string'
+ }, {
+ 'id': 'password',
+ 'label': 'Password',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'client',
+ 'label': 'Client ID',
+ 'type': 'string'
+ }, {
+ 'id': 'secret',
+ 'label': 'Client Secret',
+ 'type': 'string',
+ 'secret': True,
+ }, {
+ 'id': 'tenant',
+ 'label': 'Tenant ID',
+ 'type': 'string'
+ }]
+ }
+ )
diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py
index 72791c5c7e..73a3ce6bdd 100644
--- a/awx/main/models/jobs.py
+++ b/awx/main/models/jobs.py
@@ -156,7 +156,7 @@ class JobOptions(BaseModel):
def clean_credential(self):
cred = self.credential
- if cred and cred.kind != 'ssh':
+ if cred and cred.kind != 'machine':
raise ValidationError(
_('You must provide a machine / SSH credential.'),
)
diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py
index f7c7054f32..4ec373191f 100644
--- a/awx/main/tests/factories/fixtures.py
+++ b/awx/main/tests/factories/fixtures.py
@@ -10,6 +10,7 @@ from awx.main.models import (
JobTemplate,
Job,
NotificationTemplate,
+ CredentialType,
Credential,
Inventory,
Label,
@@ -84,8 +85,14 @@ def mk_project(name, organization=None, description=None, persisted=True):
return project
-def mk_credential(name, cloud=False, kind='ssh', persisted=True):
- cred = Credential(name=name, cloud=cloud, kind=kind)
+def mk_credential(name, credential_type='ssh', persisted=True):
+ type_ = CredentialType.defaults[credential_type]()
+ if persisted:
+ type_.save()
+ cred = Credential(
+ credential_type=type_,
+ name=name
+ )
if persisted:
cred.save()
return cred
diff --git a/awx/main/tests/factories/tower.py b/awx/main/tests/factories/tower.py
index 975adde43b..0536afc693 100644
--- a/awx/main/tests/factories/tower.py
+++ b/awx/main/tests/factories/tower.py
@@ -213,12 +213,12 @@ def create_job_template(name, roles=None, persisted=True, **kwargs):
if 'cloud_credential' in kwargs:
cloud_cred = kwargs['cloud_credential']
if type(cloud_cred) is not Credential:
- cloud_cred = mk_credential(cloud_cred, kind='aws', persisted=persisted)
+ cloud_cred = mk_credential(cloud_cred, credential_type='aws', persisted=persisted)
if 'network_credential' in kwargs:
net_cred = kwargs['network_credential']
if type(net_cred) is not Credential:
- net_cred = mk_credential(net_cred, kind='net', persisted=persisted)
+ net_cred = mk_credential(net_cred, credential_type='net', persisted=persisted)
if 'project' in kwargs:
proj = kwargs['project']
diff --git a/awx/main/tests/functional/api/test_credential.py b/awx/main/tests/functional/api/test_credential.py
index b51d6a6dfb..2f1fdee97d 100644
--- a/awx/main/tests/functional/api/test_credential.py
+++ b/awx/main/tests/functional/api/test_credential.py
@@ -1,70 +1,139 @@
+import json
import mock # noqa
import pytest
+from awx.main.models.credential import Credential, CredentialType
+from awx.main.utils.common import decrypt_field
from awx.api.versioning import reverse
+@pytest.mark.django_db
+@pytest.mark.parametrize('kind, total', [
+ ('ssh', 1), ('net', 0)
+])
+def test_filter_by_v1_kind(get, admin, organization, kind, total):
+ CredentialType.setup_tower_managed_defaults()
+ cred = Credential(
+ credential_type=CredentialType.from_v1_kind('ssh'),
+ name='Best credential ever',
+ organization=organization,
+ inputs={
+ 'username': u'jim',
+ 'password': u'secret'
+ }
+ )
+ cred.save()
+
+ response = get(
+ reverse('api:credential_list', kwargs={'version': 'v1'}),
+ admin,
+ QUERY_STRING='kind=%s' % kind
+ )
+ assert response.status_code == 200
+ assert response.data['count'] == total
+
+
+@pytest.mark.django_db
+def test_filter_by_v1_invalid_kind(get, admin, organization):
+ response = get(
+ reverse('api:credential_list', kwargs={'version': 'v1'}),
+ admin,
+ QUERY_STRING='kind=bad_kind'
+ )
+ assert response.status_code == 400
+
+
#
# user credential creation
#
@pytest.mark.django_db
-def test_create_user_credential_via_credentials_list(post, get, alice):
- response = post(reverse('api:credential_list'), {
- 'user': alice.id,
- 'name': 'Some name',
- 'username': 'someusername'
- }, alice)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_user_credential_via_credentials_list(post, get, alice, credentialtype_ssh, version, params):
+ params['user'] = alice.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ alice
+ )
assert response.status_code == 201
- response = get(reverse('api:credential_list'), alice)
+ response = get(reverse('api:credential_list', kwargs={'version': version}), alice)
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
-def test_credential_validation_error_with_bad_user(post, admin):
- response = post(reverse('api:credential_list'), {
- 'user': 'asdf',
- 'name': 'Some name',
- 'username': 'someusername'
- }, admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_credential_validation_error_with_bad_user(post, admin, version, credentialtype_ssh, params):
+ params['user'] = 'asdf'
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
assert response.status_code == 400
assert response.data['user'][0] == 'Incorrect type. Expected pk value, received unicode.'
@pytest.mark.django_db
-def test_create_user_credential_via_user_credentials_list(post, get, alice):
- response = post(reverse('api:user_credentials_list', kwargs={'pk': alice.pk}), {
- 'user': alice.pk,
- 'name': 'Some name',
- 'username': 'someusername',
- }, alice)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_user_credential_via_user_credentials_list(post, get, alice, credentialtype_ssh, version, params):
+ params['user'] = alice.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:user_credentials_list', kwargs={'version': version, 'pk': alice.pk}),
+ params,
+ alice
+ )
assert response.status_code == 201
- response = get(reverse('api:user_credentials_list', kwargs={'pk': alice.pk}), alice)
+ response = get(reverse('api:user_credentials_list', kwargs={'version': version, 'pk': alice.pk}), alice)
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
-def test_create_user_credential_via_credentials_list_xfail(post, alice, bob):
- response = post(reverse('api:credential_list'), {
- 'user': bob.id,
- 'name': 'Some name',
- 'username': 'someusername'
- }, alice)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_user_credential_via_credentials_list_xfail(post, alice, bob, version, params):
+ params['user'] = bob.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ alice
+ )
assert response.status_code == 403
@pytest.mark.django_db
-def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob):
- response = post(reverse('api:user_credentials_list', kwargs={'pk': bob.pk}), {
- 'user': bob.pk,
- 'name': 'Some name',
- 'username': 'someusername'
- }, alice)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob, version, params):
+ params['user'] = bob.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:user_credentials_list', kwargs={'version': version, 'pk': bob.pk}),
+ params,
+ alice
+ )
assert response.status_code == 403
@@ -74,15 +143,24 @@ def test_create_user_credential_via_user_credentials_list_xfail(post, alice, bob
@pytest.mark.django_db
-def test_create_team_credential(post, get, team, organization, org_admin, team_member):
- response = post(reverse('api:credential_list'), {
- 'team': team.id,
- 'name': 'Some name',
- 'username': 'someusername'
- }, org_admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_team_credential(post, get, team, organization, org_admin, team_member, credentialtype_ssh, version, params):
+ params['team'] = team.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ org_admin
+ )
assert response.status_code == 201
- response = get(reverse('api:team_credentials_list', kwargs={'pk': team.pk}), team_member)
+ response = get(
+ reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}),
+ team_member
+ )
assert response.status_code == 200
assert response.data['count'] == 1
@@ -91,39 +169,60 @@ def test_create_team_credential(post, get, team, organization, org_admin, team_m
@pytest.mark.django_db
-def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member):
- response = post(reverse('api:team_credentials_list', kwargs={'pk': team.pk}), {
- 'team': team.pk,
- 'name': 'Some name',
- 'username': 'someusername',
- }, org_admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_team_credential_via_team_credentials_list(post, get, team, org_admin, team_member, credentialtype_ssh, version, params):
+ params['team'] = team.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}),
+ params,
+ org_admin
+ )
assert response.status_code == 201
- response = get(reverse('api:team_credentials_list', kwargs={'pk': team.pk}), team_member)
+ response = get(
+ reverse('api:team_credentials_list', kwargs={'version': version, 'pk': team.pk}),
+ team_member
+ )
assert response.status_code == 200
assert response.data['count'] == 1
@pytest.mark.django_db
-def test_create_team_credential_by_urelated_user_xfail(post, team, organization, alice, team_member):
- response = post(reverse('api:credential_list'), {
- 'team': team.id,
- 'organization': organization.id,
- 'name': 'Some name',
- 'username': 'someusername'
- }, alice)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_team_credential_by_urelated_user_xfail(post, team, organization, alice, team_member, version, params):
+ params['team'] = team.id
+ params['organization'] = organization.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ alice
+ )
assert response.status_code == 403
@pytest.mark.django_db
-def test_create_team_credential_by_team_member_xfail(post, team, organization, alice, team_member):
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_team_credential_by_team_member_xfail(post, team, organization, alice, team_member, version, params):
# Members can't add credentials, only org admins.. for now?
- response = post(reverse('api:credential_list'), {
- 'team': team.id,
- 'organization': organization.id,
- 'name': 'Some name',
- 'username': 'someusername'
- }, team_member)
+ params['team'] = team.id
+ params['organization'] = organization.id
+ params['name'] = 'Some name'
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ team_member
+ )
assert response.status_code == 403
@@ -133,109 +232,120 @@ def test_create_team_credential_by_team_member_xfail(post, team, organization, a
@pytest.mark.django_db
-def test_grant_org_credential_to_org_user_through_role_users(post, credential, organization, org_admin, org_member):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_org_user_through_role_users(post, credential, organization, org_admin, org_member, version):
credential.organization = organization
credential.save()
- response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': org_member.id
}, org_admin)
assert response.status_code == 204
@pytest.mark.django_db
-def test_grant_org_credential_to_org_user_through_user_roles(post, credential, organization, org_admin, org_member):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_org_user_through_user_roles(post, credential, organization, org_admin, org_member, version):
credential.organization = organization
credential.save()
- response = post(reverse('api:user_roles_list', kwargs={'pk': org_member.id}), {
+ response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': org_member.id}), {
'id': credential.use_role.id
}, org_admin)
assert response.status_code == 204
@pytest.mark.django_db
-def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice, version):
credential.organization = organization
credential.save()
- response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': alice.id
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
-def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice, version):
credential.organization = organization
credential.save()
- response = post(reverse('api:user_roles_list', kwargs={'pk': alice.id}), {
+ response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': alice.id}), {
'id': credential.use_role.id
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
-def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob, version):
# normal users can't do this
credential.admin_role.members.add(alice)
- response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': bob.id
}, alice)
assert response.status_code == 400
@pytest.mark.django_db
-def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member, version):
# org admins can't either
credential.admin_role.members.add(org_admin)
- response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': org_member.id
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
-def test_sa_grant_private_credential_to_user_through_role_users(post, credential, admin, bob):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_sa_grant_private_credential_to_user_through_role_users(post, credential, admin, bob, version):
# but system admins can
- response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_users_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': bob.id
}, admin)
assert response.status_code == 204
@pytest.mark.django_db
-def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob, version):
# normal users can't do this
credential.admin_role.members.add(alice)
- response = post(reverse('api:user_roles_list', kwargs={'pk': bob.id}), {
+ response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': bob.id}), {
'id': credential.use_role.id
}, alice)
assert response.status_code == 400
@pytest.mark.django_db
-def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member, version):
# org admins can't either
credential.admin_role.members.add(org_admin)
- response = post(reverse('api:user_roles_list', kwargs={'pk': org_member.id}), {
+ response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': org_member.id}), {
'id': credential.use_role.id
}, org_admin)
assert response.status_code == 400
@pytest.mark.django_db
-def test_sa_grant_private_credential_to_user_through_user_roles(post, credential, admin, bob):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_sa_grant_private_credential_to_user_through_user_roles(post, credential, admin, bob, version):
# but system admins can
- response = post(reverse('api:user_roles_list', kwargs={'pk': bob.id}), {
+ response = post(reverse('api:user_roles_list', kwargs={'version': version, 'pk': bob.id}), {
'id': credential.use_role.id
}, admin)
assert response.status_code == 204
@pytest.mark.django_db
-def test_grant_org_credential_to_team_through_role_teams(post, credential, organization, org_admin, org_auditor, team):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_team_through_role_teams(post, credential, organization, org_admin, org_auditor, team, version):
assert org_auditor not in credential.read_role
credential.organization = organization
credential.save()
- response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': team.id
}, org_admin)
assert response.status_code == 204
@@ -243,11 +353,12 @@ def test_grant_org_credential_to_team_through_role_teams(post, credential, organ
@pytest.mark.django_db
-def test_grant_org_credential_to_team_through_team_roles(post, credential, organization, org_admin, org_auditor, team):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_grant_org_credential_to_team_through_team_roles(post, credential, organization, org_admin, org_auditor, team, version):
assert org_auditor not in credential.read_role
credential.organization = organization
credential.save()
- response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {
+ response = post(reverse('api:team_roles_list', kwargs={'version': version, 'pk': team.id}), {
'id': credential.use_role.id
}, org_admin)
assert response.status_code == 204
@@ -255,18 +366,20 @@ def test_grant_org_credential_to_team_through_team_roles(post, credential, organ
@pytest.mark.django_db
-def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team, version):
# not even a system admin can grant a private cred to a team though
- response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {
+ response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': credential.use_role.id}), {
'id': team.id
}, admin)
assert response.status_code == 400
@pytest.mark.django_db
-def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team):
+@pytest.mark.parametrize('version', ['v1', 'v2'])
+def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team, version):
# not even a system admin can grant a private cred to a team though
- response = post(reverse('api:role_teams_list', kwargs={'pk': team.id}), {
+ response = post(reverse('api:role_teams_list', kwargs={'version': version, 'pk': team.id}), {
'id': credential.use_role.id
}, admin)
assert response.status_code == 400
@@ -278,34 +391,55 @@ def test_sa_grant_private_credential_to_team_through_team_roles(post, credential
@pytest.mark.django_db
-def test_create_org_credential_as_not_admin(post, organization, org_member):
- response = post(reverse('api:credential_list'), {
- 'name': 'Some name',
- 'username': 'someusername',
- 'organization': organization.id,
- }, org_member)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_org_credential_as_not_admin(post, organization, org_member, credentialtype_ssh, version, params):
+ params['name'] = 'Some name'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list'),
+ params,
+ org_member
+ )
assert response.status_code == 403
@pytest.mark.django_db
-def test_create_org_credential_as_admin(post, organization, org_admin):
- response = post(reverse('api:credential_list'), {
- 'name': 'Some name',
- 'username': 'someusername',
- 'organization': organization.id,
- }, org_admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_create_org_credential_as_admin(post, organization, org_admin, credentialtype_ssh, version, params):
+ params['name'] = 'Some name'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list'),
+ params,
+ org_admin
+ )
assert response.status_code == 201
@pytest.mark.django_db
-def test_credential_detail(post, get, organization, org_admin):
- response = post(reverse('api:credential_list'), {
- 'name': 'Some name',
- 'username': 'someusername',
- 'organization': organization.id,
- }, org_admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_credential_detail(post, get, organization, org_admin, credentialtype_ssh, version, params):
+ params['name'] = 'Some name'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list'),
+ params,
+ org_admin
+ )
assert response.status_code == 201
- response = get(reverse('api:credential_detail', kwargs={'pk': response.data['id']}), org_admin)
+ response = get(
+ reverse('api:credential_detail', kwargs={'version': version, 'pk': response.data['id']}),
+ org_admin
+ )
assert response.status_code == 200
summary_fields = response.data['summary_fields']
assert 'organization' in summary_fields
@@ -314,79 +448,901 @@ def test_credential_detail(post, get, organization, org_admin):
@pytest.mark.django_db
-def test_list_created_org_credentials(post, get, organization, org_admin, org_member):
- response = post(reverse('api:credential_list'), {
- 'name': 'Some name',
- 'username': 'someusername',
- 'organization': organization.id,
- }, org_admin)
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'username': 'someusername'}],
+ ['v2', {'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
+def test_list_created_org_credentials(post, get, organization, org_admin, org_member, credentialtype_ssh, version, params):
+ params['name'] = 'Some name'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ org_admin
+ )
assert response.status_code == 201
- response = get(reverse('api:credential_list'), org_admin)
+ response = get(
+ reverse('api:credential_list', kwargs={'version': version}),
+ org_admin
+ )
assert response.status_code == 200
assert response.data['count'] == 1
- response = get(reverse('api:credential_list'), org_member)
+ response = get(
+ reverse('api:credential_list', kwargs={'version': version}),
+ org_member
+ )
assert response.status_code == 200
assert response.data['count'] == 0
- response = get(reverse('api:organization_credential_list', kwargs={'pk': organization.pk}), org_admin)
+ response = get(
+ reverse('api:organization_credential_list', kwargs={'version': version, 'pk': organization.pk}),
+ org_admin
+ )
assert response.status_code == 200
assert response.data['count'] == 1
- response = get(reverse('api:organization_credential_list', kwargs={'pk': organization.pk}), org_member)
+ response = get(
+ reverse('api:organization_credential_list', kwargs={'version': version, 'pk': organization.pk}),
+ org_member
+ )
assert response.status_code == 200
assert response.data['count'] == 0
@pytest.mark.parametrize('order_by', ('password', '-password', 'password,pk', '-password,pk'))
+@pytest.mark.parametrize('version', ('v1', 'v2'))
@pytest.mark.django_db
-def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin, order_by):
+def test_list_cannot_order_by_encrypted_field(post, get, organization, org_admin, credentialtype_ssh, order_by, version):
for i, password in enumerate(('abc', 'def', 'xyz')):
- response = post(reverse('api:credential_list'), {
- 'organization': organization.id,
- 'name': 'C%d' % i,
- 'password': password
- }, org_admin)
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ {
+ 'organization': organization.id,
+ 'name': 'C%d' % i,
+ 'password': password
+ },
+ org_admin
+ )
+
+ response = get(
+ reverse('api:credential_list', kwargs={'version': version}),
+ org_admin,
+ QUERY_STRING='order_by=%s' % order_by,
+ status=400
+ )
+ assert response.status_code == 400
- response = get(reverse('api:credential_list'), org_admin,
- QUERY_STRING='order_by=%s' % order_by, status=400)
+
+@pytest.mark.django_db
+def test_inputs_cannot_contain_extra_fields(get, post, organization, admin, credentialtype_ssh):
+ params = {
+ 'name': 'Best credential ever',
+ 'organization': organization.id,
+ 'credential_type': credentialtype_ssh.pk,
+ 'inputs': {
+ 'invalid_field': 'foo'
+ },
+ }
+ response = post(
+ reverse('api:credential_list', kwargs={'version': 'v2'}),
+ params,
+ admin
+ )
assert response.status_code == 400
+ assert "'invalid_field' was unexpected" in json.dumps(response.data)
#
-# Openstack Credentials
+# SCM Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'scm',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'ssh_key_data': 'some_key_data',
+ 'ssh_key_unlock': 'some_key_unlock',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'ssh_key_data': 'some_key_data',
+ 'ssh_key_unlock': 'some_key_unlock',
+ }
+ }]
+])
+def test_scm_create_ok(post, organization, admin, version, params):
+ scm = CredentialType.defaults['scm']()
+ scm.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+ assert decrypt_field(cred, 'ssh_key_data') == 'some_key_data'
+ assert decrypt_field(cred, 'ssh_key_unlock') == 'some_key_unlock'
+
+
#
+# Vault Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'ssh',
+ 'name': 'Best credential ever',
+ 'vault_password': 'some_password',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'vault_password': 'some_password',
+ }
+ }]
+])
+def test_vault_create_ok(post, organization, admin, version, params):
+ vault = CredentialType.defaults['vault']()
+ vault.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert decrypt_field(cred, 'vault_password') == 'some_password'
+
+#
+# Net Credentials
+#
@pytest.mark.django_db
-def test_openstack_create_ok(post, organization, admin):
- data = {
- 'kind': 'openstack',
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'net',
'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'ssh_key_data': 'some_key_data',
+ 'ssh_key_unlock': 'some_key_unlock',
+ 'authorize_password': 'some_authorize_password',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'ssh_key_data': 'some_key_data',
+ 'ssh_key_unlock': 'some_key_unlock',
+ 'authorize_password': 'some_authorize_password',
+ }
+ }]
+])
+def test_net_create_ok(post, organization, admin, version, params):
+ net = CredentialType.defaults['net']()
+ net.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+ assert decrypt_field(cred, 'ssh_key_data') == 'some_key_data'
+ assert decrypt_field(cred, 'ssh_key_unlock') == 'some_key_unlock'
+ assert decrypt_field(cred, 'authorize_password') == 'some_authorize_password'
+
+
+#
+# Cloudforms Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'cloudforms',
+ 'name': 'Best credential ever',
+ 'host': 'some_host',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'host': 'some_host',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }
+ }]
+])
+def test_cloudforms_create_ok(post, organization, admin, version, params):
+ cloudforms = CredentialType.defaults['cloudforms']()
+ cloudforms.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['host'] == 'some_host'
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+
+
+#
+# GCE Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'gce',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'project': 'some_project',
+ 'ssh_key_data': 'XYZ'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'project': 'some_project',
+ 'ssh_key_data': 'XYZ'
+ }
+ }]
+])
+def test_gce_create_ok(post, organization, admin, version, params):
+ gce = CredentialType.defaults['gce']()
+ gce.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert cred.inputs['project'] == 'some_project'
+ assert decrypt_field(cred, 'ssh_key_data') == 'XYZ'
+
+
+#
+# Azure Classic
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'azure',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'ssh_key_data': 'XYZ'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'ssh_key_data': 'XYZ'
+ }
+ }]
+])
+def test_azure_create_ok(post, organization, admin, version, params):
+ azure = CredentialType.defaults['azure']()
+ azure.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'ssh_key_data') == 'XYZ'
+
+
+#
+# Azure Resource Manager
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'azure_rm',
+ 'name': 'Best credential ever',
+ 'subscription': 'some_subscription',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'client': 'some_client',
+ 'secret': 'some_secret',
+ 'tenant': 'some_tenant'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'subscription': 'some_subscription',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'client': 'some_client',
+ 'secret': 'some_secret',
+ 'tenant': 'some_tenant'
+ }
+ }]
+])
+def test_azure_rm_create_ok(post, organization, admin, version, params):
+ azure_rm = CredentialType.defaults['azure_rm']()
+ azure_rm.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['subscription'] == 'some_subscription'
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+ assert cred.inputs['client'] == 'some_client'
+ assert decrypt_field(cred, 'secret') == 'some_secret'
+ assert cred.inputs['tenant'] == 'some_tenant'
+
+
+#
+# RH Satellite6 Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'satellite6',
+ 'name': 'Best credential ever',
+ 'host': 'some_host',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'host': 'some_host',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }
+ }]
+])
+def test_satellite6_create_ok(post, organization, admin, version, params):
+ sat6 = CredentialType.defaults['satellite6']()
+ sat6.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['host'] == 'some_host'
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+
+
+#
+# AWS Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'aws',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'security_token': 'abc123'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'password': 'some_password',
+ 'security_token': 'abc123'
+ }
+ }]
+])
+def test_aws_create_ok(post, organization, admin, version, params):
+ aws = CredentialType.defaults['aws']()
+ aws.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+ assert decrypt_field(cred, 'security_token') == 'abc123'
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'aws',
+ 'name': 'Best credential ever',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {}
+ }]
+])
+def test_aws_create_fail_required_fields(post, organization, admin, version, params):
+ aws = CredentialType.defaults['aws']()
+ aws.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 400
+
+ assert Credential.objects.count() == 0
+ assert 'username' in json.dumps(response.data)
+ assert 'password' in json.dumps(response.data)
+
+
+#
+# Rackspace Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'rax',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'username': 'some_username',
+ 'password': 'some_password',
+ }
+ }]
+])
+def test_rax_create_ok(post, organization, admin, version, params):
+ rax = CredentialType.defaults['rackspace']()
+ rax.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'rax',
+ 'name': 'Best credential ever'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {}
+ }]
+])
+def test_rax_create_fail_required_field(post, organization, admin, version, params):
+ rax = CredentialType.defaults['rackspace']()
+ rax.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 400
+
+ assert Credential.objects.count() == 0
+ assert 'username' in json.dumps(response.data)
+ assert 'password' in json.dumps(response.data)
+
+
+#
+# VMware vCenter Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'vmware',
+ 'host': 'some_host',
+ 'name': 'Best credential ever',
+ 'username': 'some_username',
+ 'password': 'some_password'
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {
+ 'host': 'some_host',
+ 'username': 'some_username',
+ 'password': 'some_password'
+ }
+ }]
+])
+def test_vmware_create_ok(post, organization, admin, version, params):
+ vmware = CredentialType.defaults['vmware']()
+ vmware.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['host'] == 'some_host'
+ assert cred.inputs['username'] == 'some_username'
+ assert decrypt_field(cred, 'password') == 'some_password'
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'vmware',
+ 'name': 'Best credential ever',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'name': 'Best credential ever',
+ 'inputs': {}
+ }]
+])
+def test_vmware_create_fail_required_fields(post, organization, admin, version, params):
+ vmware = CredentialType.defaults['vmware']()
+ vmware.save()
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 400
+
+ assert Credential.objects.count() == 0
+ assert 'username' in json.dumps(response.data)
+ assert 'password' in json.dumps(response.data)
+ assert 'host' in json.dumps(response.data)
+
+
+#
+# Openstack Credentials
+#
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
'username': 'some_user',
'password': 'some_password',
'project': 'some_project',
'host': 'some_host',
- 'organization': organization.id,
- }
- response = post(reverse('api:credential_list'), data, admin)
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'inputs': {
+ 'username': 'some_user',
+ 'password': 'some_password',
+ 'project': 'some_project',
+ 'host': 'some_host',
+ }
+ }]
+])
+def test_openstack_create_ok(post, organization, admin, version, params):
+ openstack = CredentialType.defaults['openstack']()
+ openstack.save()
+ params['kind'] = 'openstack'
+ params['name'] = 'Best credential ever'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
assert response.status_code == 201
@pytest.mark.django_db
-def test_openstack_create_fail_required_fields(post, organization, admin):
- data = {
- 'kind': 'openstack',
+@pytest.mark.parametrize('version, params', [
+ ['v1', {}],
+ ['v2', {
+ 'credential_type': 1,
+ 'inputs': {}
+ }]
+])
+def test_openstack_create_fail_required_fields(post, organization, admin, version, params):
+ openstack = CredentialType.defaults['openstack']()
+ openstack.save()
+ params['kind'] = 'openstack'
+ params['name'] = 'Best credential ever'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 400
+ assert 'username' in json.dumps(response.data)
+ assert 'password' in json.dumps(response.data)
+ assert 'host' in json.dumps(response.data)
+ assert 'project' in json.dumps(response.data)
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
'name': 'Best credential ever',
- 'organization': organization.id,
+ 'kind': 'ssh',
+ 'username': 'joe',
+ 'password': '',
+ }],
+ ['v2', {
+ 'name': 'Best credential ever',
+ 'credential_type': 1,
+ 'inputs': {
+ 'username': 'joe',
+ 'password': '',
+ }
+ }]
+])
+def test_field_removal(put, organization, admin, credentialtype_ssh, version, params):
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name='Best credential ever',
+ organization=organization,
+ inputs={
+ 'username': u'jim',
+ 'password': u'secret'
+ }
+ )
+ cred.save()
+
+ params['organization'] = organization.id
+ response = put(
+ reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}),
+ params,
+ admin
+ )
+ assert response.status_code == 200
+
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'joe'
+ assert 'password' not in cred.inputs
+
+
+#
+# test secret encryption/decryption
+#
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'kind': 'ssh',
+ 'username': 'joe',
+ 'password': 'secret',
+ }],
+ ['v2', {
+ 'credential_type': 1,
+ 'inputs': {
+ 'username': 'joe',
+ 'password': 'secret',
+ }
+ }]
+])
+def test_secret_encryption_on_create(get, post, organization, admin, credentialtype_ssh, version, params):
+ params['name'] = 'Best credential ever'
+ params['organization'] = organization.id
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ response = get(
+ reverse('api:credential_list', kwargs={'version': version}),
+ admin
+ )
+ assert response.status_code == 200
+ assert response.data['count'] == 1
+ cred = response.data['results'][0]
+ if version == 'v1':
+ assert cred['username'] == 'joe'
+ assert cred['password'] == '$encrypted$'
+ elif version == 'v2':
+ assert cred['inputs']['username'] == 'joe'
+ assert cred['inputs']['password'] == '$encrypted$'
+
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['password'].startswith('$encrypted$UTF8$AES')
+ assert decrypt_field(cred, 'password') == 'secret'
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'password': 'secret'}],
+ ['v2', {'inputs': {'username': 'joe', 'password': 'secret'}}]
+])
+def test_secret_encryption_on_update(get, post, patch, organization, admin, credentialtype_ssh, version, params):
+ response = post(
+ reverse('api:credential_list', kwargs={'version': 'v2'}),
+ {
+ 'name': 'Best credential ever',
+ 'organization': organization.id,
+ 'credential_type': 1,
+ 'inputs': {
+ 'username': 'joe',
+ }
+ },
+ admin
+ )
+ assert response.status_code == 201
+
+ response = patch(
+ reverse('api:credential_detail', kwargs={'pk': 1, 'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 200
+
+ response = get(
+ reverse('api:credential_list', kwargs={'version': version}),
+ admin
+ )
+ assert response.status_code == 200
+ assert response.data['count'] == 1
+ cred = response.data['results'][0]
+ if version == 'v1':
+ assert cred['username'] == 'joe'
+ assert cred['password'] == '$encrypted$'
+ elif version == 'v2':
+ assert cred['inputs']['username'] == 'joe'
+ assert cred['inputs']['password'] == '$encrypted$'
+
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['password'].startswith('$encrypted$UTF8$AES')
+ assert decrypt_field(cred, 'password') == 'secret'
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('version, params', [
+ ['v1', {
+ 'username': 'joe',
+ 'password': '$encrypted$',
+ }],
+ ['v2', {
+ 'inputs': {
+ 'username': 'joe',
+ 'password': '$encrypted$',
+ }
+ }]
+])
+def test_secret_encryption_previous_value(patch, organization, admin, credentialtype_ssh, version, params):
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name='Best credential ever',
+ organization=organization,
+ inputs={
+ 'username': u'jim',
+ 'password': u'secret'
+ }
+ )
+ cred.save()
+
+ assert decrypt_field(cred, 'password') == 'secret'
+ response = patch(
+ reverse('api:credential_detail', kwargs={'pk': cred.pk, 'version': version}),
+ params,
+ admin
+ )
+ assert response.status_code == 200
+
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'joe'
+ assert cred.inputs['password'].startswith('$encrypted$UTF8$AES')
+ assert decrypt_field(cred, 'password') == 'secret'
+
+
+@pytest.mark.django_db
+def test_custom_credential_type_create(get, post, organization, admin):
+ credential_type = CredentialType(
+ kind='cloud',
+ name='MyCloud',
+ inputs = {
+ 'fields': [{
+ 'id': 'api_token',
+ 'label': 'API Token',
+ 'type': 'string',
+ 'secret': True
+ }]
+ }
+ )
+ credential_type.save()
+ params = {
+ 'name': 'Best credential ever',
+ 'organization': organization.pk,
+ 'credential_type': credential_type.pk,
+ 'inputs': {
+ 'api_token': 'secret'
+ }
}
- response = post(reverse('api:credential_list'), data, admin)
- assert response.status_code == 400
- assert 'username' in response.data
- assert 'password' in response.data
- assert 'host' in response.data
- assert 'project' in response.data
+ response = post(
+ reverse('api:credential_list', kwargs={'version': 'v2'}),
+ params,
+ admin
+ )
+ assert response.status_code == 201
+
+ response = get(
+ reverse('api:credential_list', kwargs={'version': 'v2'}),
+ admin
+ )
+ assert response.status_code == 200
+ assert response.data['count'] == 1
+ cred = response.data['results'][0]
+ assert cred['inputs']['api_token'] == '$encrypted$'
+
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['api_token'].startswith('$encrypted$UTF8$AES')
+ assert decrypt_field(cred, 'api_token') == 'secret'
#
@@ -394,11 +1350,16 @@ def test_openstack_create_fail_required_fields(post, organization, admin):
#
+@pytest.mark.parametrize('version, params', [
+ ['v1', {'name': 'Some name', 'username': 'someusername'}],
+ ['v2', {'name': 'Some name', 'credential_type': 1, 'inputs': {'username': 'someusername'}}]
+])
@pytest.mark.django_db
-def test_create_credential_missing_user_team_org_xfail(post, admin):
+def test_create_credential_missing_user_team_org_xfail(post, admin, credentialtype_ssh, version, params):
# Must specify one of user, team, or organization
- response = post(reverse('api:credential_list'), {
- 'name': 'Some name',
- 'username': 'someusername',
- }, admin)
+ response = post(
+ reverse('api:credential_list', kwargs={'version': version}),
+ params,
+ admin
+ )
assert response.status_code == 400
diff --git a/awx/main/tests/functional/api/test_credential_type.py b/awx/main/tests/functional/api/test_credential_type.py
new file mode 100644
index 0000000000..aa40651edf
--- /dev/null
+++ b/awx/main/tests/functional/api/test_credential_type.py
@@ -0,0 +1,205 @@
+import json
+
+import pytest
+
+from awx.main.models.credential import CredentialType
+from awx.api.versioning import reverse
+
+
+@pytest.mark.django_db
+def test_list_as_unauthorized_xfail(get):
+ response = get(reverse('api:credential_type_list'))
+ assert response.status_code == 401
+
+
+@pytest.mark.django_db
+def test_list_as_normal_user(get, alice):
+ response = get(reverse('api:credential_type_list'), alice)
+ assert response.status_code == 200
+ assert response.data['count'] == 0
+
+
+@pytest.mark.django_db
+def test_list_as_admin(get, admin):
+ response = get(reverse('api:credential_type_list'), admin)
+ assert response.status_code == 200
+ assert response.data['count'] == 0
+
+
+@pytest.mark.django_db
+def test_create_as_unauthorized_xfail(get, post):
+ response = post(reverse('api:credential_type_list'), {
+ 'name': 'Custom Credential Type',
+ })
+ assert response.status_code == 401
+
+
+@pytest.mark.django_db
+def test_update_as_unauthorized_xfail(patch):
+ ssh = CredentialType.defaults['ssh']()
+ ssh.save()
+ response = patch(
+ reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
+ {
+ 'name': 'Some Other Name'
+ }
+ )
+ assert response.status_code == 401
+
+
+@pytest.mark.django_db
+def test_delete_as_unauthorized_xfail(delete):
+ ssh = CredentialType.defaults['ssh']()
+ ssh.save()
+ response = delete(
+ reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
+ )
+ assert response.status_code == 401
+
+
+@pytest.mark.django_db
+def test_create_as_normal_user_xfail(get, post, alice):
+ response = post(reverse('api:credential_type_list'), {
+ 'name': 'Custom Credential Type',
+ }, alice)
+ assert response.status_code == 403
+ assert get(reverse('api:credential_type_list'), alice).data['count'] == 0
+
+
+@pytest.mark.django_db
+def test_create_as_admin(get, post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'Custom Credential Type',
+ 'inputs': {},
+ 'injectors': {}
+ }, admin)
+ assert response.status_code == 201
+
+ response = get(reverse('api:credential_type_list'), admin)
+ assert response.data['count'] == 1
+ assert response.data['results'][0]['name'] == 'Custom Credential Type'
+ assert response.data['results'][0]['inputs'] == {}
+ assert response.data['results'][0]['injectors'] == {}
+ assert response.data['results'][0]['managed_by_tower'] is False
+
+
+@pytest.mark.django_db
+def test_create_managed_by_tower_readonly(get, post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'Custom Credential Type',
+ 'inputs': {},
+ 'injectors': {},
+ 'managed_by_tower': True
+ }, admin)
+ assert response.status_code == 201
+
+ response = get(reverse('api:credential_type_list'), admin)
+ assert response.data['count'] == 1
+ assert response.data['results'][0]['managed_by_tower'] is False
+
+
+@pytest.mark.django_db
+def test_create_with_valid_inputs(get, post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'MyCloud',
+ 'inputs': {
+ 'fields': [{
+ 'id': 'api_token',
+ 'label': 'API Token',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }]
+ },
+ 'injectors': {}
+ }, admin)
+ assert response.status_code == 201
+
+ response = get(reverse('api:credential_type_list'), admin)
+ assert response.data['count'] == 1
+ fields = response.data['results'][0]['inputs']['fields']
+ assert len(fields) == 1
+ assert fields[0]['id'] == 'api_token'
+ assert fields[0]['label'] == 'API Token'
+ assert fields[0]['ask_at_runtime'] is True
+ assert fields[0]['secret'] is True
+ assert fields[0]['type'] == 'string'
+
+
+@pytest.mark.django_db
+def test_create_with_invalid_inputs_xfail(post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'MyCloud',
+ 'inputs': {'feeelds': {},},
+ 'injectors': {}
+ }, admin)
+ assert response.status_code == 400
+ assert "'feeelds' was unexpected" in json.dumps(response.data)
+
+
+@pytest.mark.django_db
+def test_create_with_valid_injectors(get, post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'MyCloud',
+ 'inputs': {
+ 'fields': [{
+ 'id': 'api_token',
+ 'label': 'API Token',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }]
+ },
+ 'injectors': {
+ 'env': {
+ 'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
+ }
+ }
+ }, admin)
+ assert response.status_code == 201
+
+ response = get(reverse('api:credential_type_list'), admin)
+ assert response.data['count'] == 1
+ injectors = response.data['results'][0]['injectors']
+ assert len(injectors) == 1
+ assert injectors['env'] == {
+ 'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
+ }
+
+
+@pytest.mark.django_db
+def test_create_with_invalid_injectors_xfail(post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'MyCloud',
+ 'inputs': {},
+ 'injectors': {'nonsense': 123}
+ }, admin)
+ assert response.status_code == 400
+
+
+@pytest.mark.django_db
+def test_create_with_undefined_template_variable_xfail(post, admin):
+ response = post(reverse('api:credential_type_list'), {
+ 'kind': 'cloud',
+ 'name': 'MyCloud',
+ 'inputs': {
+ 'fields': [{
+ 'id': 'api_token',
+ 'label': 'API Token',
+ 'type': 'string',
+ 'secret': True,
+ 'ask_at_runtime': True
+ }]
+ },
+ 'injectors': {
+ 'env': {'ANSIBLE_MY_CLOUD_TOKEN': '{{api_tolkien}}'}
+ }
+ }, admin)
+ assert response.status_code == 400
+ assert "'api_tolkien' is undefined" in json.dumps(response.data)
diff --git a/awx/main/tests/functional/api/test_job_runtime_params.py b/awx/main/tests/functional/api/test_job_runtime_params.py
index 99e4e6c0fd..bf12977b1b 100644
--- a/awx/main/tests/functional/api/test_job_runtime_params.py
+++ b/awx/main/tests/functional/api/test_job_runtime_params.py
@@ -10,8 +10,15 @@ from awx.api.versioning import reverse
@pytest.fixture
-def runtime_data(organization):
- cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
+def runtime_data(organization, credentialtype_ssh):
+ cred_obj = Credential.objects.create(
+ name='runtime-cred',
+ credential_type=credentialtype_ssh,
+ inputs={
+ 'username': 'test_user2',
+ 'password': 'pas4word2'
+ }
+ )
inv_obj = organization.inventories.create(name="runtime-inv")
return dict(
extra_vars='{"job_launch_var": 4}',
diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py
index fd89e3df15..3e7248f4eb 100644
--- a/awx/main/tests/functional/conftest.py
+++ b/awx/main/tests/functional/conftest.py
@@ -28,7 +28,7 @@ from rest_framework.test import (
force_authenticate,
)
-from awx.main.models.credential import Credential
+from awx.main.models.credential import CredentialType, Credential
from awx.main.models.jobs import JobTemplate
from awx.main.models.inventory import (
Group,
@@ -191,18 +191,43 @@ def organization(instance):
@pytest.fixture
-def credential():
- return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret')
+def credentialtype_ssh():
+ ssh = CredentialType.defaults['ssh']()
+ ssh.save()
+ return ssh
@pytest.fixture
-def machine_credential():
- return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word')
+def credentialtype_aws():
+ aws = CredentialType.defaults['aws']()
+ aws.save()
+ return aws
@pytest.fixture
-def org_credential(organization):
- return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret', organization=organization)
+def credentialtype_net():
+ net = CredentialType.defaults['net']()
+ net.save()
+ return net
+
+
+@pytest.fixture
+def credential(credentialtype_aws):
+ return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
+ inputs={'username': 'something', 'password': 'secret'})
+
+
+@pytest.fixture
+def machine_credential(credentialtype_ssh):
+ return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred',
+ inputs={'username': 'test_user', 'password': 'pas4word'})
+
+
+@pytest.fixture
+def org_credential(organization, credentialtype_aws):
+ return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
+ inputs={'username': 'something', 'password': 'secret'},
+ organization=organization)
@pytest.fixture
diff --git a/awx/main/tests/functional/test_credential.py b/awx/main/tests/functional/test_credential.py
new file mode 100644
index 0000000000..908992e623
--- /dev/null
+++ b/awx/main/tests/functional/test_credential.py
@@ -0,0 +1,289 @@
+# Copyright (c) 2017 Ansible by Red Hat
+# All Rights Reserved.
+
+import pytest
+from django.core.exceptions import ValidationError
+
+from awx.main.utils.common import decrypt_field
+from awx.main.models import Credential, CredentialType
+
+
+@pytest.mark.django_db
+def test_default_cred_types():
+ assert sorted(CredentialType.defaults.keys()) == [
+ 'aws',
+ 'azure',
+ 'azure_rm',
+ 'cloudforms',
+ 'gce',
+ 'net',
+ 'openstack',
+ 'rackspace',
+ 'satellite6',
+ 'scm',
+ 'ssh',
+ 'vault',
+ 'vmware',
+ ]
+ for type_ in CredentialType.defaults.values():
+ assert type_().managed_by_tower is True
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('kind', ['net', 'scm', 'ssh', 'vault'])
+def test_cred_type_kind_uniqueness(kind):
+ """
+ non-cloud credential types are exclusive_on_kind (you can only use *one* of
+ them at a time)
+ """
+ assert CredentialType.defaults[kind]().unique_by_kind is True
+
+
+@pytest.mark.django_db
+def test_cloud_kind_uniqueness():
+ """
+ you can specify more than one cloud credential type (as long as they have
+ different names so you don't e.g., use ec2 twice")
+ """
+ assert CredentialType.defaults['aws']().unique_by_kind is False
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('input_, valid', [
+ ({}, True),
+ ({'fields': []}, True),
+ ({'fields': {}}, False),
+ ({'fields': 123}, False),
+ ({'fields': [{'id': 'username', 'label': 'Username', 'foo': 'bar'}]}, False),
+ ({'fields': [{'id': 'username', 'label': 'Username', 'type': 'string'}]}, True),
+ ({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 1}]}, False),
+ ({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 'Help Text'}]}, True), # noqa
+ ({'fields': [{'id': 'password', 'label': 'Password', 'type': 'number'}]}, True),
+ ({'fields': [{'id': 'ssh_key', 'label': 'SSH Key', 'type': 'ssh_private_key'}]}, True), # noqa
+ ({'fields': [{'id': 'other', 'label': 'Other', 'type': 'boolean'}]}, False),
+ ({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': True}]}, True),
+ ({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': 'bad'}]}, False), # noqa
+ ({'fields': [{'id': 'token', 'label': 'Token', 'secret': True}]}, True),
+ ({'fields': [{'id': 'token', 'label': 'Token', 'secret': 'bad'}]}, False),
+ ({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': True}]}, True),
+ ({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': 'bad'}]}, False), # noqa
+ ({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': 'not-a-list'}]}, False), # noqa
+ ({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': []}]}, False),
+ ({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['su', 'sudo']}]}, True), # noqa
+ ({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['dup', 'dup']}]}, False), # noqa
+])
+def test_cred_type_input_schema_validity(input_, valid):
+ type_ = CredentialType(
+ kind='cloud',
+ name='SomeCloud',
+ managed_by_tower=True,
+ inputs=input_
+ )
+ if valid is False:
+ with pytest.raises(ValidationError):
+ type_.full_clean()
+ else:
+ type_.full_clean()
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize('injectors, valid', [
+ ({}, True),
+ ({'invalid-injector': {}}, False),
+ ({'file': 123}, False),
+ ({'file': {}}, False),
+ ({'file': {'template': '{{username}}'}}, True),
+ ({'file': {'foo': 'bar'}}, False),
+ ({'ssh': 123}, False),
+ ({'ssh': {}}, False),
+ ({'ssh': {'public': 'PUB'}}, False),
+ ({'ssh': {'private': 'PRIV'}}, False),
+ ({'ssh': {'public': 'PUB', 'private': 'PRIV'}}, True),
+ ({'ssh': {'public': 'PUB', 'private': 'PRIV', 'a': 'b'}}, False),
+ ({'password': {}}, False),
+ ({'password': {'key': 'Password:'}}, False),
+ ({'password': {'value': '{{pass}}'}}, False),
+ ({'password': {'key': 'Password:', 'value': '{{pass}}'}}, True),
+ ({'password': {'key': 'Password:', 'value': '{{pass}}', 'a': 'b'}}, False),
+ ({'env': 123}, False),
+ ({'env': {}}, True),
+ ({'env': {'AWX_SECRET': '{{awx_secret}}'}}, True),
+ ({'env': {'AWX_SECRET_99': '{{awx_secret}}'}}, True),
+ ({'env': {'99': '{{awx_secret}}'}}, False),
+ ({'env': {'AWX_SECRET=': '{{awx_secret}}'}}, False),
+ ({'extra_vars': 123}, False),
+ ({'extra_vars': {}}, True),
+ ({'extra_vars': {'hostname': '{{host}}'}}, True),
+ ({'extra_vars': {'hostname_99': '{{host}}'}}, True),
+ ({'extra_vars': {'99': '{{host}}'}}, False),
+ ({'extra_vars': {'99=': '{{host}}'}}, False),
+])
+def test_cred_type_injectors_schema(injectors, valid):
+ type_ = CredentialType(
+ kind='cloud',
+ name='SomeCloud',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [
+ {'id': 'username', 'type': 'string', 'label': '_'},
+ {'id': 'pass', 'type': 'string', 'label': '_'},
+ {'id': 'awx_secret', 'type': 'string', 'label': '_'},
+ {'id': 'host', 'type': 'string', 'label': '_'},
+ ]
+ },
+ injectors=injectors
+ )
+ if valid is False:
+ with pytest.raises(ValidationError):
+ type_.full_clean()
+ else:
+ type_.full_clean()
+
+
+@pytest.mark.django_db
+def test_credential_creation(organization_factory):
+ org = organization_factory('test').organization
+ type_ = CredentialType(
+ kind='cloud',
+ name='SomeCloud',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username for SomeCloud',
+ 'type': 'string'
+ }]
+ }
+ )
+ type_.save()
+
+ cred = Credential(credential_type=type_, name="Bob's Credential",
+ inputs={'username': 'bob'}, organization=org)
+ cred.save()
+ cred.full_clean()
+ assert isinstance(cred, Credential)
+ assert cred.name == "Bob's Credential"
+ assert cred.inputs['username'] == cred.username == 'bob'
+
+
+@pytest.mark.django_db
+def test_credential_creation_validation_failure(organization_factory):
+ org = organization_factory('test').organization
+ type_ = CredentialType(
+ kind='cloud',
+ name='SomeCloud',
+ managed_by_tower=True,
+ inputs={
+ 'fields': [{
+ 'id': 'username',
+ 'label': 'Username for SomeCloud',
+ 'type': 'string'
+ }]
+ }
+ )
+ type_.save()
+
+ with pytest.raises(ValidationError):
+ cred = Credential(credential_type=type_, name="Bob's Credential",
+ inputs={'user': 'wrong-key'}, organization=org)
+ cred.save()
+ cred.full_clean()
+
+
+@pytest.mark.django_db
+def test_credential_encryption(organization_factory, credentialtype_ssh):
+ org = organization_factory('test').organization
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name="Bob's Credential",
+ inputs={'password': 'testing123'},
+ organization=org
+ )
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['password'].startswith('$encrypted$')
+ assert decrypt_field(cred, 'password') == 'testing123'
+
+
+@pytest.mark.django_db
+def test_credential_encryption_with_ask(organization_factory, credentialtype_ssh):
+ org = organization_factory('test').organization
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name="Bob's Credential",
+ inputs={'password': 'ASK'},
+ organization=org
+ )
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['password'] == 'ASK'
+
+
+@pytest.mark.django_db
+def test_credential_with_multiple_secrets(organization_factory, credentialtype_ssh):
+ org = organization_factory('test').organization
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name="Bob's Credential",
+ inputs={'ssh_key_data': 'SOMEKEY', 'ssh_key_unlock': 'testing123'},
+ organization=org
+ )
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+
+ assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
+ assert decrypt_field(cred, 'ssh_key_data') == 'SOMEKEY'
+ assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
+ assert decrypt_field(cred, 'ssh_key_unlock') == 'testing123'
+
+
+@pytest.mark.django_db
+def test_credential_update(organization_factory, credentialtype_ssh):
+ org = organization_factory('test').organization
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name="Bob's Credential",
+ inputs={'password': 'testing123'},
+ organization=org
+ )
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ cred.inputs['password'] = 'newpassword'
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['password'].startswith('$encrypted$')
+ assert decrypt_field(cred, 'password') == 'newpassword'
+
+
+@pytest.mark.django_db
+def test_credential_update_with_prior(organization_factory, credentialtype_ssh):
+ org = organization_factory('test').organization
+ cred = Credential(
+ credential_type=credentialtype_ssh,
+ name="Bob's Credential",
+ inputs={'password': 'testing123'},
+ organization=org
+ )
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ cred.inputs['username'] = 'joe'
+ cred.inputs['password'] = '$encrypted$'
+ cred.save()
+
+ assert Credential.objects.count() == 1
+ cred = Credential.objects.all()[:1].get()
+ assert cred.inputs['username'] == 'joe'
+ assert cred.inputs['password'].startswith('$encrypted$')
+ assert decrypt_field(cred, 'password') == 'testing123'
diff --git a/awx/main/tests/functional/test_db_credential.py b/awx/main/tests/functional/test_db_credential.py
index 90ec21d4cd..9ed823deb9 100644
--- a/awx/main/tests/functional/test_db_credential.py
+++ b/awx/main/tests/functional/test_db_credential.py
@@ -5,12 +5,12 @@ from awx.main.models import Credential
@pytest.mark.django_db
-def test_cred_unique_org_name_kind(organization_factory):
+def test_cred_unique_org_name_kind(organization_factory, credentialtype_ssh):
objects = organization_factory("test")
- cred = Credential(name="test", kind="net", organization=objects.organization)
+ cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save()
with pytest.raises(IntegrityError):
- cred = Credential(name="test", kind="net", organization=objects.organization)
+ cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save()
diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py
index 6c87a53d27..c7e2224684 100644
--- a/awx/main/tests/functional/test_rbac_credential.py
+++ b/awx/main/tests/functional/test_rbac_credential.py
@@ -21,12 +21,12 @@ def test_credential_migration_user(credential, user, permissions):
@pytest.mark.django_db
-def test_two_teams_same_cred_name(organization_factory):
+def test_two_teams_same_cred_name(organization_factory, credentialtype_net):
objects = organization_factory("test",
teams=["team1", "team2"])
- cred1 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team1)
- cred2 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team2)
+ cred1 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team1)
+ cred2 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team2)
rbac.migrate_credential(apps, None)
@@ -119,7 +119,7 @@ def test_credential_access_auditor(credential, organization_factory):
@pytest.mark.django_db
-def test_credential_access_admin(user, team, credential):
+def test_credential_access_admin(user, team, credential, credentialtype_aws):
u = user('org-admin', False)
team.organization.admin_role.members.add(u)
@@ -137,7 +137,7 @@ def test_credential_access_admin(user, team, credential):
credential.admin_role.parents.add(team.admin_role)
credential.save()
- cred = Credential.objects.create(kind='aws', name='test-cred')
+ cred = Credential.objects.create(credential_type=credentialtype_aws, name='test-cred')
cred.deprecated_team = team
cred.save()
diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py
index 4b6c092680..fcb82572c5 100644
--- a/awx/main/tests/functional/test_rbac_job_start.py
+++ b/awx/main/tests/functional/test_rbac_job_start.py
@@ -51,13 +51,20 @@ class TestJobRelaunchAccess:
return jt.create_unified_job()
@pytest.fixture
- def job_with_prompts(self, machine_credential, inventory, organization):
+ def job_with_prompts(self, machine_credential, inventory, organization, credentialtype_ssh):
jt = JobTemplate.objects.create(
name='test-job-template-prompts', credential=machine_credential, inventory=inventory,
ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True,
ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True,
ask_credential_on_launch=True)
- new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word')
+ new_cred = Credential.objects.create(
+ name='new-cred',
+ credential_type=credentialtype_ssh,
+ inputs={
+ 'username': 'test_user',
+ 'password': 'pas4word'
+ }
+ )
new_inv = Inventory.objects.create(name='new-inv', organization=organization)
return jt.create_unified_job(credential=new_cred, inventory=new_inv)
diff --git a/awx/main/tests/functional/test_rbac_project.py b/awx/main/tests/functional/test_rbac_project.py
index 9f7cfbe705..65fc0614bf 100644
--- a/awx/main/tests/functional/test_rbac_project.py
+++ b/awx/main/tests/functional/test_rbac_project.py
@@ -8,7 +8,7 @@ from awx.main.migrations import _old_access as old_access
@pytest.mark.django_db
-def test_project_migration():
+def test_project_migration(credentialtype_ssh):
'''
o1 o2 o3 with o1 -- i1 o2 -- i2
@@ -59,7 +59,7 @@ def test_project_migration():
o2 = Organization.objects.create(name='o2')
o3 = Organization.objects.create(name='o3')
- c1 = Credential.objects.create(name='c1')
+ c1 = Credential.objects.create(name='c1', credential_type=credentialtype_ssh)
project_name = unicode("\xc3\xb4", "utf-8")
p1 = Project.objects.create(name=project_name, credential=c1)
diff --git a/awx/main/tests/unit/models/test_workflow_unit.py b/awx/main/tests/unit/models/test_workflow_unit.py
index ce288dced1..59d19e943e 100644
--- a/awx/main/tests/unit/models/test_workflow_unit.py
+++ b/awx/main/tests/unit/models/test_workflow_unit.py
@@ -1,7 +1,7 @@
import pytest
from awx.main.models.jobs import JobTemplate
-from awx.main.models import Inventory, Credential, Project
+from awx.main.models import Inventory, CredentialType, Credential, Project
from awx.main.models.workflow import (
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobOptions,
WorkflowJob, WorkflowJobNode
@@ -125,7 +125,12 @@ def job_node_no_prompts(workflow_job_unit, jt_ask):
def job_node_with_prompts(job_node_no_prompts):
job_node_no_prompts.char_prompts = example_prompts
job_node_no_prompts.inventory = Inventory(name='example-inv')
- job_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf')
+ ssh_type = CredentialType.defaults['ssh']()
+ job_node_no_prompts.credential = Credential(
+ name='example-inv',
+ credential_type=ssh_type,
+ inputs={'username': 'asdf', 'password': 'asdf'}
+ )
return job_node_no_prompts
@@ -138,7 +143,12 @@ def wfjt_node_no_prompts(workflow_job_template_unit, jt_ask):
def wfjt_node_with_prompts(wfjt_node_no_prompts):
wfjt_node_no_prompts.char_prompts = example_prompts
wfjt_node_no_prompts.inventory = Inventory(name='example-inv')
- wfjt_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf')
+ ssh_type = CredentialType.defaults['ssh']()
+ wfjt_node_no_prompts.credential = Credential(
+ name='example-inv',
+ credential_type=ssh_type,
+ inputs={'username': 'asdf', 'password': 'asdf'}
+ )
return wfjt_node_no_prompts
diff --git a/awx/main/tests/unit/test_network_credential.py b/awx/main/tests/unit/test_network_credential.py
index 8cdf720af3..ec9c85c816 100644
--- a/awx/main/tests/unit/test_network_credential.py
+++ b/awx/main/tests/unit/test_network_credential.py
@@ -1,6 +1,6 @@
import pytest
-from awx.main.models.credential import Credential
+from awx.main.models.credential import CredentialType, Credential
from awx.main.models.jobs import Job
from awx.main.models.inventory import Inventory
from awx.main.tasks import RunJob
@@ -10,12 +10,15 @@ def test_aws_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2)
+ aws = CredentialType.defaults['aws']()
options = {
- 'kind': 'aws',
- 'username': 'aws_user',
- 'password': 'aws_passwd',
- 'security_token': 'token',
+ 'credential_type': aws,
+ 'inputs': {
+ 'username': 'aws_user',
+ 'password': 'aws_passwd',
+ 'security_token': 'token',
+ }
}
job.cloud_credential = Credential(**options)
@@ -23,22 +26,26 @@ def test_aws_cred_parse(mocker):
mocker.patch.object(run_job, 'should_use_proot', return_value=False)
env = run_job.build_env(job, private_data_dir='/tmp')
- assert env['AWS_ACCESS_KEY'] == options['username']
- assert env['AWS_SECRET_KEY'] == options['password']
- assert env['AWS_SECURITY_TOKEN'] == options['security_token']
+ assert env['AWS_ACCESS_KEY'] == options['inputs']['username']
+ assert env['AWS_SECRET_KEY'] == options['inputs']['password']
+ assert env['AWS_SECURITY_TOKEN'] == options['inputs']['security_token']
def test_net_cred_parse(mocker):
with mocker.patch('django.db.ConnectionRouter.db_for_write'):
job = Job(id=1)
job.inventory = mocker.MagicMock(spec=Inventory, id=2)
+ net = CredentialType.defaults['aws']()
options = {
- 'username':'test',
- 'password':'test',
- 'authorize': True,
- 'authorize_password': 'passwd',
- 'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
+ 'credential_type': net,
+ 'inputs': {
+ 'username':'test',
+ 'password':'test',
+ 'authorize': True,
+ 'authorize_password': 'passwd',
+ 'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
+ }
}
private_data_files = {
'network_credential': '/tmp/this_file_does_not_exist_during_test_but_the_path_is_real',
@@ -49,21 +56,25 @@ def test_net_cred_parse(mocker):
mocker.patch.object(run_job, 'should_use_proot', return_value=False)
env = run_job.build_env(job, private_data_dir='/tmp', private_data_files=private_data_files)
- assert env['ANSIBLE_NET_USERNAME'] == options['username']
- assert env['ANSIBLE_NET_PASSWORD'] == options['password']
+ assert env['ANSIBLE_NET_USERNAME'] == options['inputs']['username']
+ assert env['ANSIBLE_NET_PASSWORD'] == options['inputs']['password']
assert env['ANSIBLE_NET_AUTHORIZE'] == '1'
- assert env['ANSIBLE_NET_AUTH_PASS'] == options['authorize_password']
+ assert env['ANSIBLE_NET_AUTH_PASS'] == options['inputs']['authorize_password']
assert env['ANSIBLE_NET_SSH_KEYFILE'] == private_data_files['network_credential']
@pytest.fixture
def mock_job(mocker):
+ ssh = CredentialType.defaults['ssh']()
options = {
- 'username':'test',
- 'password':'test',
- 'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
- 'authorize': True,
- 'authorize_password': 'passwd',
+ 'credential_type': ssh,
+ 'inputs': {
+ 'username':'test',
+ 'password':'test',
+ 'ssh_key_data': """-----BEGIN PRIVATE KEY-----\nstuff==\n-----END PRIVATE KEY-----""",
+ 'authorize': True,
+ 'authorize_password': 'passwd',
+ }
}
mock_job_attrs = {'forks': False, 'id': 1, 'cancel_flag': False, 'status': 'running', 'job_type': 'normal',
diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py
index 9bab230595..dd8a0a9a73 100644
--- a/awx/settings/defaults.py
+++ b/awx/settings/defaults.py
@@ -281,6 +281,7 @@ REST_FRAMEWORK = {
'VIEW_NAME_FUNCTION': 'awx.api.generics.get_view_name',
'VIEW_DESCRIPTION_FUNCTION': 'awx.api.generics.get_view_description',
'NON_FIELD_ERRORS_KEY': '__all__',
+ 'DEFAULT_VERSION': 'v2'
}
AUTHENTICATION_BACKENDS = (
diff --git a/tools/data_generators/rbac_dummy_data_generator.py b/tools/data_generators/rbac_dummy_data_generator.py
index 0e906a9e47..fb5c7d034c 100755
--- a/tools/data_generators/rbac_dummy_data_generator.py
+++ b/tools/data_generators/rbac_dummy_data_generator.py
@@ -321,7 +321,7 @@ try:
name='%s Credential %d User %d' % (prefix, credential_id, user_idx),
defaults=dict(created_by=next(creator_gen),
modified_by=next(modifier_gen)),
- kind='ssh'
+ credential_type=CredentialType.from_v1_kind('ssh')
)
credential.admin_role.members.add(user)
credentials.append(credential)
@@ -344,7 +344,7 @@ try:
name='%s Credential %d team %d' % (prefix, credential_id, team_idx),
defaults=dict(created_by=next(creator_gen),
modified_by=next(modifier_gen)),
- kind='ssh'
+ credential_type=CredentialType.from_v1_kind('ssh')
)
credential.admin_role.parents.add(team.member_role)
credentials.append(credential)