Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NAS-130509 / 25.04 / Add CI test for discovery auth #15010

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 142 additions & 11 deletions tests/api2/test_261_iscsi_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,21 @@
import pyscsi
import pytest
import requests
from assets.websocket.iscsi import (alua_enabled, initiator, initiator_portal,
portal, read_capacity16, target,
target_extent_associate, verify_capacity,
verify_luns, verify_ha_inquiry, verify_ha_device_identification, TUR)
from assets.websocket.iscsi import (TUR, alua_enabled, initiator, initiator_portal, portal, read_capacity16, target,
target_extent_associate, verify_capacity, verify_ha_device_identification,
verify_ha_inquiry, verify_luns)
from assets.websocket.service import ensure_service_enabled, ensure_service_started
from auto_config import ha, hostname, isns_ip, password, pool_name, user
from functions import SSH_TEST
from protocols import ISCSIDiscover, initiator_name_supported, iscsi_scsi_connection, isns_connection
from pyscsi.pyscsi.scsi_sense import sense_ascq_dict
from pytest_dependency import depends

from middlewared.service_exception import InstanceNotFound, ValidationError, ValidationErrors
from middlewared.test.integration.assets.iscsi import target_login_test
from middlewared.test.integration.assets.pool import dataset, snapshot
from middlewared.test.integration.utils import call, ssh
from middlewared.test.integration.utils.client import truenas_server
from pyscsi.pyscsi.scsi_sense import sense_ascq_dict
from pytest_dependency import depends

from auto_config import ha, hostname, isns_ip, password, pool_name, user
from functions import SSH_TEST
from protocols import (initiator_name_supported, iscsi_scsi_connection,
isns_connection)

# Setup some flags that will enable/disable tests based upon the capabilities of the
# python-scsi package in use
Expand Down Expand Up @@ -84,6 +82,14 @@ def __str__(self):
CONTROLLER_B_TARGET_PORT_GROUP_ID = 102
SERVICE_NAME = 'iscsitarget'

CHAPUSER1 = 'chapuser1'
CHAPPASS1 = 'chappassword1'

CHAPUSER2 = 'chapuser2'
CHAPPASS2 = 'userpassword2'
CHAPPEERUSER2 = 'chappeer2'
CHAPPEERPASS2 = 'peerpassword2'

# Some variables
digit = ''.join(random.choices(string.digits, k=2))
file_mountpoint = f'/tmp/iscsi-file-{hostname}'
Expand Down Expand Up @@ -785,6 +791,131 @@ def test__discovery_auth():
assert [] == call('iscsi.auth.query')


@contextlib.contextmanager
def _discovery(ip):
with ISCSIDiscover(ip) as nocred:
with ISCSIDiscover(ip, CHAPUSER1, CHAPPASS1) as user1:
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2) as user2:
yield {
'nocred': nocred,
'user1': user1,
'user2': user2,
}


def _discovery_validate_one(disc: ISCSIDiscover, iqns: set):
result = disc.discover()
assert set(result.keys()) == iqns


def _discovery_validate_all(discs: dict, iqns: set):
for disc in discs.values():
_discovery_validate_one(disc, iqns)


def test__discover_from_initiator(iscsi_running):
"""
Verify that discovery auth operates as expected, by performing iSCSI
discovery operations from the initiator in various configs.
"""
name1 = f"{target_name}x1"
name2 = f"{target_name}x2"
iqn1 = f'{basename}:{name1}'
iqn2 = f'{basename}:{name2}'

EMPTY_SET = set()
ONE_IQN_SET = set([iqn1])
TWO_IQNS_SET = set([iqn1, iqn2])

def _discovery_validate_two_targets(ip: str, discs: dict):
_discovery_validate_one(discs['nocred'], TWO_IQNS_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
# Create an auth without discovery_auth and ensure it has
# no impact.
with iscsi_auth(1, CHAPUSER1, CHAPPASS1):
_discovery_validate_one(discs['nocred'], TWO_IQNS_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
# Create an auth with CHAP discovery_auth and ensure it means only
# a discovery with the correct cred works.
with iscsi_auth(1, CHAPUSER1, CHAPPASS1, discovery_auth='CHAP'):
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER1, "WrongChapPass") as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
"WrongChapUser", CHAPPASS1) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
# Create a 2nd auth and ensure they both work
with iscsi_auth(2, CHAPUSER2, CHAPPASS2, discovery_auth='CHAP'):
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], TWO_IQNS_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2) as gooddisc:
_discovery_validate_one(gooddisc, TWO_IQNS_SET)
# Create an auth with CHAP_MUTUAL discovery_auth and ensure it means only
# a discovery with the correct cred works.
with iscsi_auth(2, CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2,
discovery_auth='CHAP_MUTUAL'):
_discovery_validate_one(discs['nocred'], EMPTY_SET)
_discovery_validate_one(discs['user1'], EMPTY_SET)
_discovery_validate_one(discs['user2'], TWO_IQNS_SET)
with ISCSIDiscover(ip,
"WrongChapUser", CHAPPASS2,
CHAPPEERUSER2, CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, "WrongChapPass",
CHAPPEERUSER2, CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
"WrongChapPeer", CHAPPEERPASS2) as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)
with ISCSIDiscover(ip,
CHAPUSER2, CHAPPASS2,
CHAPPEERUSER2, "WrongPeerPass") as baddisc:
_discovery_validate_one(baddisc, EMPTY_SET)

with initiator_portal() as config:
with _discovery(truenas_server.ip) as discs:
# No targets published yet, ensure we see none via discovery
_discovery_validate_all(discs, EMPTY_SET)
with configured_target(config, name1, "VOLUME"):
# One target published, ensure we see it via discovery
_discovery_validate_one(discs['nocred'], ONE_IQN_SET)
_discovery_validate_one(discs['user1'], ONE_IQN_SET)
_discovery_validate_one(discs['user2'], EMPTY_SET)
with configured_target(config, name2, "VOLUME"):
# Two target published, ensure we see them via discovery
_discovery_validate_two_targets(truenas_server.ip, discs)
if ha:
# If we are a HA system then enable ALUA and perform a bunch of
# similar tests
with alua_enabled():
_ensure_alua_state(True)
_wait_for_alua_settle()
with _discovery(truenas_server.nodea_ip) as nodea_discs:
with _discovery(truenas_server.nodeb_ip) as nodeb_discs:
# No targets published yet, ensure we see none via discovery
_discovery_validate_all(nodea_discs, EMPTY_SET)
_discovery_validate_all(nodeb_discs, EMPTY_SET)
with configured_target(config, name1, "VOLUME"):
with configured_target(config, name2, "VOLUME"):
_discovery_validate_two_targets(truenas_server.nodea_ip, nodea_discs)
_discovery_validate_two_targets(truenas_server.nodeb_ip, nodeb_discs)

# Turned off ALUA again
_wait_for_alua_settle()


def test__report_luns(iscsi_running):
"""
This tests REPORT LUNS and accessing multiple LUNs on a target.
Expand Down
4 changes: 2 additions & 2 deletions tests/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from functions import DELETE, POST

from .ftp_proto import ftp_connect, ftps_connect, ftp_connection, ftps_connection # noqa
from .iscsi_proto import initiator_name_supported, iscsi_scsi_connect, iscsi_scsi_connection # noqa
from .ftp_proto import ftp_connect, ftp_connection, ftps_connect, ftps_connection # noqa
from .iscsi_proto import ISCSIDiscover, initiator_name_supported, iscsi_scsi_connect, iscsi_scsi_connection # noqa
from .iSNSP.client import iSNSPClient
from .ms_rpc import MS_RPC # noqa
from .nfs_proto import SSH_NFS # noqa
Expand Down
70 changes: 70 additions & 0 deletions tests/protocols/iscsi_proto.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import contextlib
import inspect
import socket

import iscsi
from functions import SRVTarget, get_host_ip
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.utils import init_device

Expand Down Expand Up @@ -75,3 +78,70 @@ def iscsi_scsi_connection(host, iqn, lun=0, user=None, secret=None, target_user=
yield s
finally:
s.device.close()


class ISCSIDiscover:
def __init__(self,
hostname=None,
initiator_username=None,
initiator_password=None,
target_username=None,
target_password=None,
initiator_name=None,
):
self._hostname = hostname or get_host_ip(SRVTarget.DEFAULT)
self._initiator_username = None
self._initiator_password = None
self._target_username = None
self._target_password = None
self._initiator_name = None

try:
self._ip = socket.gethostbyname(self._hostname)
except socket.error:
raise ValueError(f'Cannot resolve: {self._hostname}')

if initiator_username is not None or initiator_password is not None:
if initiator_username is None or initiator_password is None:
raise ValueError("If supply one then must supply both: initiator_username, initiator_password")
self._initiator_username = initiator_username
self._initiator_password = initiator_password

if target_username is not None or target_password is not None:
if target_username is None or target_password is None:
raise ValueError("If supply one then must supply both: target_username, target_password")
self._target_username = target_username
self._target_password = target_password

if initiator_name:
self._initiator_name = initiator_name
else:
self._initiator_name = f'iqn.2018-01.org.pyscsi:{socket.gethostname()}'

def __enter__(self):
return self

def discover(self):
connected = False
try:
ctx = iscsi.Context(self._initiator_name)
ctx.set_session_type(iscsi.ISCSI_SESSION_DISCOVERY)
ctx.set_header_digest(iscsi.ISCSI_HEADER_DIGEST_NONE)
if self._initiator_username and self._initiator_password:
ctx.set_initiator_username_pwd(self._initiator_username, self._initiator_password)
if self._target_username and self._target_password:
ctx.set_target_username_pwd(self._target_username, self._target_password)
ctx.connect(self._ip, -1)
connected = True
return ctx.discover()
except Exception:
return {}
finally:
if connected:
ctx.disconnect()

def ip(self):
return self._ip

def __exit__(self, exc_type, exc_val, exc_tb):
pass
Loading