From c7f4c48090b8a209fd319a97470cbfbc4b6d73df Mon Sep 17 00:00:00 2001 From: Daniel Black Date: Sat, 15 Mar 2014 11:18:01 +1100 Subject: [PATCH] TST/BF: more changes to make sure testcases can be run on live install --- fail2ban/tests/clientreadertestcase.py | 337 ++++++++++++------------- 1 file changed, 168 insertions(+), 169 deletions(-) diff --git a/fail2ban/tests/clientreadertestcase.py b/fail2ban/tests/clientreadertestcase.py index 0213263e45..c54793bfa0 100644 --- a/fail2ban/tests/clientreadertestcase.py +++ b/fail2ban/tests/clientreadertestcase.py @@ -32,10 +32,8 @@ from .utils import LogCaptureTestCase TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "files") -if os.path.exists(os.path.join('config','fail2ban.conf')): - CONFIG_DIR='config' -else: - CONFIG_DIR='/etc/fail2ban' +STOCK = os.path.exists(os.path.join('config','fail2ban.conf')) +CONFIG_DIR='config' if STOCK else '/etc/fail2ban' IMPERFECT_CONFIG = os.path.join(os.path.dirname(__file__), 'config') @@ -185,14 +183,15 @@ def TODOtestJailActionBrokenDef(self): self.assertTrue(self._is_logged('Caught exception: While reading action joho[foo we should have got 1 or 2 groups. Got: 0')) - def testStockSSHJail(self): - jail = JailReader('sshd', basedir=CONFIG_DIR) # we are running tests from root project dir atm - self.assertTrue(jail.read()) - self.assertTrue(jail.getOptions()) - self.assertFalse(jail.isEnabled()) - self.assertEqual(jail.getName(), 'sshd') - jail.setName('ssh-funky-blocker') - self.assertEqual(jail.getName(), 'ssh-funky-blocker') + if STOCK: + def testStockSSHJail(self): + jail = JailReader('sshd', basedir=CONFIG_DIR) # we are running tests from root project dir atm + self.assertTrue(jail.read()) + self.assertTrue(jail.getOptions()) + self.assertFalse(jail.isEnabled()) + self.assertEqual(jail.getName(), 'sshd') + jail.setName('ssh-funky-blocker') + self.assertEqual(jail.getName(), 'ssh-funky-blocker') def testSplitOption(self): # Simple example @@ -407,163 +406,163 @@ def testReadTestJailConf(self): self.assertTrue(self._is_logged("Errors in jail 'missingbitsjail'. Skipping...")) self.assertTrue(self._is_logged("No file(s) found for glob /weapons/of/mass/destruction")) - - def testReadStockJailConf(self): - jails = JailsReader(basedir=CONFIG_DIR) # we are running tests from root project dir atm - self.assertTrue(jails.read()) # opens fine - self.assertTrue(jails.getOptions()) # reads fine - comm_commands = jails.convert() - # by default None of the jails is enabled and we get no - # commands to communicate to the server - self.assertEqual(comm_commands, []) - - # TODO: make sure this is handled well - ## We should not "read" some bogus jail - #old_comm_commands = comm_commands[:] # make a copy - #self.assertRaises(ValueError, jails.getOptions, "BOGUS") - #self.printLog() - #self.assertTrue(self._is_logged("No section: 'BOGUS'")) - ## and there should be no side-effects - #self.assertEqual(jails.convert(), old_comm_commands) - - allFilters = set() - - # All jails must have filter and action set - # TODO: evolve into a parametric test - for jail in jails.sections(): - if jail == 'INCLUDES': - continue - filterName = jails.get(jail, 'filter') - allFilters.add(filterName) - self.assertTrue(len(filterName)) - # moreover we must have a file for it - # and it must be readable as a Filter - filterReader = FilterReader(filterName, jail, {}) - filterReader.setBaseDir(CONFIG_DIR) - self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine - filterReader.getOptions({}) # reads fine - - # test if filter has failregex set - self.assertTrue(filterReader._opts.get('failregex', '').strip()) - - actions = jails.get(jail, 'action') - self.assertTrue(len(actions.strip())) - - # somewhat duplicating here what is done in JailsReader if - # the jail is enabled - for act in actions.split('\n'): - actName, actOpt = JailReader.extractOptions(act) - self.assertTrue(len(actName)) - self.assertTrue(isinstance(actOpt, dict)) - if actName == 'iptables-multiport': - self.assertTrue('port' in actOpt) - - actionReader = ActionReader( - actName, jail, {}, basedir=CONFIG_DIR) - self.assertTrue(actionReader.read()) - actionReader.getOptions({}) # populate _opts - cmds = actionReader.convert() - self.assertTrue(len(cmds)) - - # all must have some actionban - self.assertTrue(actionReader._opts.get('actionban', '').strip()) - - # Verify that all filters found under config/ have a jail - def testReadStockJailFilterComplete(self): - jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) - self.assertTrue(jails.read()) # opens fine - self.assertTrue(jails.getOptions()) # reads fine - # grab all filter names - filters = set(os.path.splitext(os.path.split(a)[1])[0] - for a in glob.glob(os.path.join('config', 'filter.d', '*.conf')) - if not a.endswith('common.conf')) - filters_jail = set(jail.options['filter'] for jail in jails.jails) - self.maxDiff = None - self.assertTrue(filters.issubset(filters_jail), - "More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail)) - self.assertTrue(filters_jail.issubset(filters), - "Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters)) - - def testReadStockJailConfForceEnabled(self): - # more of a smoke test to make sure that no obvious surprises - # on users' systems when enabling shipped jails - jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) # we are running tests from root project dir atm - self.assertTrue(jails.read()) # opens fine - self.assertTrue(jails.getOptions()) # reads fine - comm_commands = jails.convert(allow_no_files=True) - - # by default we have lots of jails ;) - self.assertTrue(len(comm_commands)) - - # and we know even some of them by heart - for j in ['sshd', 'recidive']: - # by default we have 'auto' backend ATM - self.assertTrue(['add', j, 'auto'] in comm_commands) - # and warn on useDNS - self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands) - self.assertTrue(['start', j] in comm_commands) - - # last commands should be the 'start' commands - self.assertEqual(comm_commands[-1][0], 'start') - - for j in jails._JailsReader__jails: - actions = j._JailReader__actions - jail_name = j.getName() - # make sure that all of the jails have actions assigned, - # otherwise it makes little to no sense - self.assertTrue(len(actions), - msg="No actions found for jail %s" % jail_name) - - # Test for presence of blocktype (in relation to gh-232) - for action in actions: - commands = action.convert() - action_name = action.getName() - if '' in str(commands): - # Verify that it is among cInfo - self.assertTrue('blocktype' in action._initOpts) - # Verify that we have a call to set it up - blocktype_present = False - target_command = ['set', jail_name, 'action', action_name, 'blocktype'] - for command in commands: - if (len(command) > 5 and - command[:5] == target_command): - blocktype_present = True - continue - self.assertTrue( - blocktype_present, - msg="Found no %s command among %s" - % (target_command, str(commands)) ) - - - def testConfigurator(self): - configurator = Configurator() - configurator.setBaseDir(CONFIG_DIR) - self.assertEqual(configurator.getBaseDir(), CONFIG_DIR) - - configurator.readEarly() - opts = configurator.getEarlyOptions() - # our current default settings - self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock') - self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid') - - configurator.getOptions() - configurator.convertToProtocol() - commands = configurator.getConfigStream() - # and there is logging information left to be passed into the - # server - self.assertEqual(sorted(commands), - [['set', 'dbfile', - '/var/lib/fail2ban/fail2ban.sqlite3'], - ['set', 'dbpurgeage', 86400], - ['set', 'loglevel', "INFO"], - ['set', 'logtarget', '/var/log/fail2ban.log']]) - - # and if we force change configurator's fail2ban's baseDir - # there should be an error message (test visually ;) -- - # otherwise just a code smoke test) - configurator._Configurator__jails.setBaseDir('/tmp') - self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp') - self.assertEqual(configurator.getBaseDir(), CONFIG_DIR) + if STOCK: + def testReadStockJailConf(self): + jails = JailsReader(basedir=CONFIG_DIR) # we are running tests from root project dir atm + self.assertTrue(jails.read()) # opens fine + self.assertTrue(jails.getOptions()) # reads fine + comm_commands = jails.convert() + # by default None of the jails is enabled and we get no + # commands to communicate to the server + self.assertEqual(comm_commands, []) + + # TODO: make sure this is handled well + ## We should not "read" some bogus jail + #old_comm_commands = comm_commands[:] # make a copy + #self.assertRaises(ValueError, jails.getOptions, "BOGUS") + #self.printLog() + #self.assertTrue(self._is_logged("No section: 'BOGUS'")) + ## and there should be no side-effects + #self.assertEqual(jails.convert(), old_comm_commands) + + allFilters = set() + + # All jails must have filter and action set + # TODO: evolve into a parametric test + for jail in jails.sections(): + if jail == 'INCLUDES': + continue + filterName = jails.get(jail, 'filter') + allFilters.add(filterName) + self.assertTrue(len(filterName)) + # moreover we must have a file for it + # and it must be readable as a Filter + filterReader = FilterReader(filterName, jail, {}) + filterReader.setBaseDir(CONFIG_DIR) + self.assertTrue(filterReader.read(),"Failed to read filter:" + filterName) # opens fine + filterReader.getOptions({}) # reads fine + + # test if filter has failregex set + self.assertTrue(filterReader._opts.get('failregex', '').strip()) + + actions = jails.get(jail, 'action') + self.assertTrue(len(actions.strip())) + + # somewhat duplicating here what is done in JailsReader if + # the jail is enabled + for act in actions.split('\n'): + actName, actOpt = JailReader.extractOptions(act) + self.assertTrue(len(actName)) + self.assertTrue(isinstance(actOpt, dict)) + if actName == 'iptables-multiport': + self.assertTrue('port' in actOpt) + + actionReader = ActionReader( + actName, jail, {}, basedir=CONFIG_DIR) + self.assertTrue(actionReader.read()) + actionReader.getOptions({}) # populate _opts + cmds = actionReader.convert() + self.assertTrue(len(cmds)) + + # all must have some actionban + self.assertTrue(actionReader._opts.get('actionban', '').strip()) + + # Verify that all filters found under config/ have a jail + def testReadStockJailFilterComplete(self): + jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) + self.assertTrue(jails.read()) # opens fine + self.assertTrue(jails.getOptions()) # reads fine + # grab all filter names + filters = set(os.path.splitext(os.path.split(a)[1])[0] + for a in glob.glob(os.path.join('config', 'filter.d', '*.conf')) + if not a.endswith('common.conf')) + filters_jail = set(jail.options['filter'] for jail in jails.jails) + self.maxDiff = None + self.assertTrue(filters.issubset(filters_jail), + "More filters exists than are referenced in stock jail.conf %r" % filters.difference(filters_jail)) + self.assertTrue(filters_jail.issubset(filters), + "Stock jail.conf references non-existent filters %r" % filters_jail.difference(filters)) + + def testReadStockJailConfForceEnabled(self): + # more of a smoke test to make sure that no obvious surprises + # on users' systems when enabling shipped jails + jails = JailsReader(basedir=CONFIG_DIR, force_enable=True) # we are running tests from root project dir atm + self.assertTrue(jails.read()) # opens fine + self.assertTrue(jails.getOptions()) # reads fine + comm_commands = jails.convert(allow_no_files=True) + + # by default we have lots of jails ;) + self.assertTrue(len(comm_commands)) + + # and we know even some of them by heart + for j in ['sshd', 'recidive']: + # by default we have 'auto' backend ATM + self.assertTrue(['add', j, 'auto'] in comm_commands) + # and warn on useDNS + self.assertTrue(['set', j, 'usedns', 'warn'] in comm_commands) + self.assertTrue(['start', j] in comm_commands) + + # last commands should be the 'start' commands + self.assertEqual(comm_commands[-1][0], 'start') + + for j in jails._JailsReader__jails: + actions = j._JailReader__actions + jail_name = j.getName() + # make sure that all of the jails have actions assigned, + # otherwise it makes little to no sense + self.assertTrue(len(actions), + msg="No actions found for jail %s" % jail_name) + + # Test for presence of blocktype (in relation to gh-232) + for action in actions: + commands = action.convert() + action_name = action.getName() + if '' in str(commands): + # Verify that it is among cInfo + self.assertTrue('blocktype' in action._initOpts) + # Verify that we have a call to set it up + blocktype_present = False + target_command = ['set', jail_name, 'action', action_name, 'blocktype'] + for command in commands: + if (len(command) > 5 and + command[:5] == target_command): + blocktype_present = True + continue + self.assertTrue( + blocktype_present, + msg="Found no %s command among %s" + % (target_command, str(commands)) ) + + + def testStockConfigurator(self): + configurator = Configurator() + configurator.setBaseDir(CONFIG_DIR) + self.assertEqual(configurator.getBaseDir(), CONFIG_DIR) + + configurator.readEarly() + opts = configurator.getEarlyOptions() + # our current default settings + self.assertEqual(opts['socket'], '/var/run/fail2ban/fail2ban.sock') + self.assertEqual(opts['pidfile'], '/var/run/fail2ban/fail2ban.pid') + + configurator.getOptions() + configurator.convertToProtocol() + commands = configurator.getConfigStream() + # and there is logging information left to be passed into the + # server + self.assertEqual(sorted(commands), + [['set', 'dbfile', + '/var/lib/fail2ban/fail2ban.sqlite3'], + ['set', 'dbpurgeage', 86400], + ['set', 'loglevel', "INFO"], + ['set', 'logtarget', '/var/log/fail2ban.log']]) + + # and if we force change configurator's fail2ban's baseDir + # there should be an error message (test visually ;) -- + # otherwise just a code smoke test) + configurator._Configurator__jails.setBaseDir('/tmp') + self.assertEqual(configurator._Configurator__jails.getBaseDir(), '/tmp') + self.assertEqual(configurator.getBaseDir(), CONFIG_DIR) def testMultipleSameAction(self): basedir = tempfile.mkdtemp("fail2ban_conf")