summaryrefslogtreecommitdiffstats
path: root/awx_plugins/credentials/conjur.py
blob: a7fd3a3a654a670e40335cb40b815447416ceab4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from .plugin import CredentialPlugin, CertFiles, raise_for_status

from urllib.parse import urljoin, quote

from .plugin import translate_function as _
import requests
import base64
import binascii


conjur_inputs = {
    'fields': [
        {
            'id': 'url',
            'label': _('Conjur URL'),
            'type': 'string',
            'format': 'url',
        },
        {
            'id': 'api_key',
            'label': _('API Key'),
            'type': 'string',
            'secret': True,
        },
        {
            'id': 'account',
            'label': _('Account'),
            'type': 'string',
        },
        {
            'id': 'username',
            'label': _('Username'),
            'type': 'string',
        },
        {'id': 'cacert', 'label': _('Public Key Certificate'), 'type': 'string', 'multiline': True},
    ],
    'metadata': [
        {
            'id': 'secret_path',
            'label': _('Secret Identifier'),
            'type': 'string',
            'help_text': _('The identifier for the secret e.g., /some/identifier'),
        },
        {
            'id': 'secret_version',
            'label': _('Secret Version'),
            'type': 'string',
            'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'),
        },
    ],
    'required': ['url', 'api_key', 'account', 'username'],
}


def _is_base64(s: str) -> bool:
    try:
        return base64.b64encode(base64.b64decode(s.encode("utf-8"))) == s.encode("utf-8")
    except binascii.Error:
        return False


def conjur_backend(**kwargs):
    url = kwargs['url']
    api_key = kwargs['api_key']
    account = quote(kwargs['account'], safe='')
    username = quote(kwargs['username'], safe='')
    secret_path = quote(kwargs['secret_path'], safe='')
    version = kwargs.get('secret_version')
    cacert = kwargs.get('cacert', None)

    auth_kwargs = {
        'headers': {'Content-Type': 'text/plain', 'Accept-Encoding': 'base64'},
        'data': api_key,
        'allow_redirects': False,
    }

    with CertFiles(cacert) as cert:
        # https://www.conjur.org/api.html#authentication-authenticate-post
        auth_kwargs['verify'] = cert
        try:
            resp = requests.post(urljoin(url, '/'.join(['authn', account, username, 'authenticate'])), **auth_kwargs)
            resp.raise_for_status()
        except requests.exceptions.HTTPError:
            resp = requests.post(urljoin(url, '/'.join(['api', 'authn', account, username, 'authenticate'])), **auth_kwargs)
    raise_for_status(resp)
    token = resp.content.decode('utf-8')

    lookup_kwargs = {
        'headers': {'Authorization': 'Token token="{}"'.format(token if _is_base64(token) else base64.b64encode(token.encode('utf-8')).decode('utf-8'))},
        'allow_redirects': False,
    }

    # https://www.conjur.org/api.html#secrets-retrieve-a-secret-get
    path = urljoin(url, '/'.join(['secrets', account, 'variable', secret_path]))
    path_conjurcloud = urljoin(url, '/'.join(['api', 'secrets', account, 'variable', secret_path]))
    if version:
        ver = "version={}".format(version)
        path = '?'.join([path, ver])
        path_conjurcloud = '?'.join([path_conjurcloud, ver])

    with CertFiles(cacert) as cert:
        lookup_kwargs['verify'] = cert
        try:
            resp = requests.get(path, timeout=30, **lookup_kwargs)
            resp.raise_for_status()
        except requests.exceptions.HTTPError:
            resp = requests.get(path_conjurcloud, timeout=30, **lookup_kwargs)
    raise_for_status(resp)
    return resp.text


conjur_plugin = CredentialPlugin('CyberArk Conjur Secrets Manager Lookup', inputs=conjur_inputs, backend=conjur_backend)