Skip to content

Commit

Permalink
Move cred type unite tests to awx-plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismeyersfsu committed Dec 19, 2024
1 parent bd96000 commit cf9e679
Showing 1 changed file with 0 additions and 262 deletions.
262 changes: 0 additions & 262 deletions awx/main/tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import configparser
import json
import os
import shutil
Expand Down Expand Up @@ -856,205 +855,6 @@ def test_multi_vault_password_ask(self, private_data_dir, job, mock_me):
assert '--vault-id dev@prompt' in ' '.join(args)
assert '--vault-id prod@prompt' in ' '.join(args)

@pytest.mark.parametrize("verify", (True, False))
def test_k8s_credential(self, job, private_data_dir, verify, mock_me):
k8s = CredentialType.defaults['kubernetes_bearer_token']()
inputs = {
'host': 'https://example.org/',
'bearer_token': 'token123',
}
if verify:
inputs['verify_ssl'] = True
inputs['ssl_ca_cert'] = 'CERTDATA'
credential = Credential(
pk=1,
credential_type=k8s,
inputs=inputs,
)
credential.inputs['bearer_token'] = encrypt_field(credential, 'bearer_token')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['K8S_AUTH_HOST'] == 'https://example.org/'
assert env['K8S_AUTH_API_KEY'] == 'token123'

if verify:
assert env['K8S_AUTH_VERIFY_SSL'] == 'True'
local_path = to_host_path(env['K8S_AUTH_SSL_CA_CERT'], private_data_dir)
with open(local_path, 'r') as f:
cert = f.read()
assert cert == 'CERTDATA'
else:
assert env['K8S_AUTH_VERIFY_SSL'] == 'False'
assert 'K8S_AUTH_SSL_CA_CERT' not in env

assert safe_env['K8S_AUTH_API_KEY'] == HIDDEN_PASSWORD

def test_aws_cloud_credential(self, job, private_data_dir, mock_me):
aws = CredentialType.defaults['aws']()
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'})
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['AWS_ACCESS_KEY_ID'] == 'bob'
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
assert 'AWS_SECURITY_TOKEN' not in env
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD

def test_aws_cloud_credential_with_sts_token(self, private_data_dir, job, mock_me):
aws = CredentialType.defaults['aws']()
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret', 'security_token': 'token'})
for key in ('password', 'security_token'):
credential.inputs[key] = encrypt_field(credential, key)
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['AWS_ACCESS_KEY_ID'] == 'bob'
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
assert env['AWS_SECURITY_TOKEN'] == 'token'
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD

@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me):
gce = CredentialType.defaults['gce']()
credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
runner_path = env[cred_env_var]
local_path = to_host_path(runner_path, private_data_dir)
with open(local_path, 'rb') as f_host:
json_data = json.load(f_host)
assert json_data['type'] == 'service_account'
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
assert json_data['client_email'] == 'bob'
assert json_data['project_id'] == 'some-project'

def test_azure_rm_with_tenant(self, private_data_dir, job, mock_me):
azure = CredentialType.defaults['azure_rm']()
credential = Credential(
pk=1, credential_type=azure, inputs={'client': 'some-client', 'secret': 'some-secret', 'tenant': 'some-tenant', 'subscription': 'some-subscription'}
)
credential.inputs['secret'] = encrypt_field(credential, 'secret')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['AZURE_CLIENT_ID'] == 'some-client'
assert env['AZURE_SECRET'] == 'some-secret'
assert env['AZURE_TENANT'] == 'some-tenant'
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD

def test_azure_rm_with_password(self, private_data_dir, job, mock_me):
azure = CredentialType.defaults['azure_rm']()
credential = Credential(
pk=1, credential_type=azure, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'}
)
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
assert env['AZURE_AD_USER'] == 'bob'
assert env['AZURE_PASSWORD'] == 'secret'
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD

def test_vmware_credentials(self, private_data_dir, job, mock_me):
vmware = CredentialType.defaults['vmware']()
credential = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

assert env['VMWARE_USER'] == 'bob'
assert env['VMWARE_PASSWORD'] == 'secret'
assert env['VMWARE_HOST'] == 'https://example.org'
assert safe_env['VMWARE_PASSWORD'] == HIDDEN_PASSWORD

def test_openstack_credentials(self, private_data_dir, job, mock_me):
task = jobs.RunJob()
task.instance = job
openstack = CredentialType.defaults['openstack']()
credential = Credential(
pk=1, credential_type=openstack, inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'}
)
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)

private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir)
env = task.build_env(job, private_data_dir, private_data_files=private_data_files)
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)

config_loc = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir)
with open(config_loc, 'r') as f:
shade_config = f.read()
assert shade_config == '\n'.join(
[
'clouds:',
' devstack:',
' auth:',
' auth_url: https://keystone.example.org',
' password: secret',
' project_name: tenant-name',
' username: bob',
' verify: true',
'',
]
)

@pytest.mark.parametrize("ca_file", [None, '/path/to/some/file'])
def test_rhv_credentials(self, private_data_dir, job, ca_file, mock_me):
rhv = CredentialType.defaults['rhv']()
inputs = {
'host': 'some-ovirt-host.example.org',
'username': 'bob',
'password': 'some-pass',
}
if ca_file:
inputs['ca_file'] = ca_file
credential = Credential(pk=1, credential_type=rhv, inputs=inputs)
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

config = configparser.ConfigParser()
host_path = to_host_path(env['OVIRT_INI_PATH'], private_data_dir)
config.read(host_path)
assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org'
assert config.get('ovirt', 'ovirt_username') == 'bob'
assert config.get('ovirt', 'ovirt_password') == 'some-pass'
if ca_file:
assert config.get('ovirt', 'ovirt_ca_file') == ca_file
else:
with pytest.raises(configparser.NoOptionError):
config.get('ovirt', 'ovirt_ca_file')

@pytest.mark.parametrize(
'authorize, expected_authorize',
[
Expand Down Expand Up @@ -1089,68 +889,6 @@ def test_net_credentials(self, authorize, expected_authorize, job, private_data_
assert f.read() == self.EXAMPLE_PRIVATE_KEY
assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD

def test_terraform_cloud_credentials(self, job, private_data_dir, mock_me):
terraform = CredentialType.defaults['terraform']()
hcl_config = '''
backend "s3" {
bucket = "s3_sample_bucket"
key = "/tf_state/"
region = "us-east-1"
}
'''
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config})
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
with open(local_path, 'r') as f:
config = f.read()
assert config == hcl_config

def test_terraform_gcs_backend_credentials(self, job, private_data_dir, mock_me):
terraform = CredentialType.defaults['terraform']()
hcl_config = '''
backend "gcs" {
bucket = "gce_storage"
}
'''
gce_backend_credentials = '''
{
"type": "service_account",
"project_id": "sample",
"private_key_id": "eeeeeeeeeeeeeeeeeeeeeeeeeee",
"private_key": "-----BEGIN PRIVATE KEY-----\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "0123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/cloud-content-robot%40sample.iam.gserviceaccount.com",
}
'''
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config, 'gce_credentials': gce_backend_credentials})
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
credential.inputs['gce_credentials'] = encrypt_field(credential, 'gce_credentials')
job.credentials.add(credential)

env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)

local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
with open(local_path, 'r') as f:
config = f.read()
assert config == hcl_config

credentials_path = to_host_path(env['GOOGLE_BACKEND_CREDENTIALS'], private_data_dir)
with open(credentials_path, 'r') as f:
credentials = f.read()
assert credentials == gce_backend_credentials

def test_multi_cloud(self, private_data_dir, mock_me):
gce = CredentialType.defaults['gce']()
gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
Expand Down

0 comments on commit cf9e679

Please sign in to comment.