Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added auth refractor : Improved code for login.py #161

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 44 additions & 147 deletions core/login.py
Original file line number Diff line number Diff line change
@@ -1,165 +1,62 @@
import requests
from . import zapscan
from . import parsers
import utils.logger
import json
import base64
import ast
import sys
import time
from utils.config import update_value, get_value
import utils.logs as logs

from utils.config import update_value,get_value

from . import zapscan, parsers

class APILogin:

def __init__(self):
self.api_logger = zapscan.logger()
self.parse_data = parsers.PostmanParser()

def fetch_logintoken(self,url,method,headers,body=None,relogin=None):
if method.upper() == "GET":
login_request = requests.get(url,headers=headers)
elif method.upper() == "POST":
login_request = requests.post(url,headers=headers,json=body)
logs.logging.info("HTTP response of login API : %s %s %s",login_request.status_code,headers,body)
else:
print("[-]Invalid request")
sys.exit(1)

def fetch_logintoken(self, url, method, headers, body=None, relogin=None):
try:
login_response = json.loads(login_request.text)
except:
pass

if relogin is not None:
print("Session fixation attack won't be tested since it failed to re-login.")
return

auth_names = get_value('config.property','login','auth_names')
auth_type = get_value('config.property','login','auth_type')
auth_names = auth_names.split(',')
#auth_header = get_value('config.property','login','auth_header')
if method.upper() == "GET":
response = requests.get(url, headers=headers)
elif method.upper() == "POST":
response = requests.post(url, headers=headers, json=body)
logs.logging.info("HTTP response of login API: %s", response.status_code)
else:
raise ValueError("Invalid HTTP method")

response_data = response.json()
return self.handle_authentication(response, response_data, url)

# Auth types:
# 1. Cookie
# 2. Basic
# 3. Oauth
auth_status = False
if auth_type == 'cookie':
if login_request.headers['Set-Cookie']:
auth_cookie = {'cookie' : login_request.headers['Set-Cookie']}
print("[+]Login successful")
update_value('login','auth_success','Y')
update_value('login','cookie',auth_cookie)
auth_status = True
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
logs.logging.error(f"Error during login request: {e}")
sys.exit(1)

def handle_authentication(self, response, response_data, url):
auth_type = get_value('config.property', 'login', 'auth_type')
auth_names = get_value('config.property', 'login', 'auth_names').split(',')

# Basic and oauth auth type code will come here(yet to develop).
if auth_type == 'cookie' and 'Set-Cookie' in response.headers:
auth_cookie = {'cookie': response.headers['Set-Cookie']}
update_value('login', 'cookie', auth_cookie)
update_value('login', 'auth_success', 'Y')
logs.logging.info("[+] Login successful via cookie.")
return True
else:
for auth_name in auth_names:
if auth_name in login_response:
auth_success_token = login_response[auth_name]
print("[+]Login successful")
update_value('login','auth_success','Y')
update_value('login','auth_success_param',auth_name)
update_value('login','auth_success_token',auth_success_token)
auth_status = True
break

if not auth_status:
login_response = input("Failed to login. Do you want to continue scanning without cookie(y/n),"+self.api_logger.G+url+': '+self.api_logger.W)
if login_response == 'Y' or login_response == 'y':
return
elif login_response == 'n' or login_response == 'N':
sys.exit(1)


def create_urllist(self,collection_data):
url_list = []
for data in collection_data:
try:
url = data['url']['raw']
url_list.append(url)
except:
url = data['url']
url_list.append(url)
return url_list


def verify_logout(self,collection_data,api_url):
if api_url is not None:
for data in collection_data:
if data['url'] == api_url:
url,method,headers,body = data['url'],data['method'],data['headers'],data['body']
logout_data = {'logouturl' : url, 'logoutmethod' : method ,'logoutheaders' : headers, 'logoutbody' : body,'logoutresult' : 'Y'}
break

for key,value in list(logout_data.items()):
update_value("logout",key,value)
return
else:
print("Failed")

def auth_verify(self,collection_data,api):
login_names = ['login', 'signin','authenticate']
logout_names = ['logout','signout']
if api == 'login':
api_types = login_names
elif api == 'logout':
api_types = logout_names

if auth_name in response_data:
auth_token = response_data[auth_name]
update_value('login', 'auth_success', 'Y')
update_value('login', 'auth_success_param', auth_name)
update_value('login', 'auth_success_token', auth_token)
logs.logging.info("[+] Login successful via token.")
return True
logs.logging.error("Login failed.")
return False

def auth_verify(self, collection_data, api):
url_list = self.create_urllist(collection_data)

api_types = ['login', 'signin', 'authenticate'] if api == 'login' else ['logout', 'signout']

for url in url_list:
for name in api_types:
if name in url:
if api == 'login':
result = input("Is it a correct login URL(y/n),"+self.api_logger.G+url+': '+self.api_logger.W)
elif api == 'logout':
result = input("Is it a correct logout URL(y/n),"+self.api_logger.G+url+': '+self.api_logger.W)

if result == 'y' or result == 'Y':
return url,api
else:
return None,None
return None,None


def verify_login(self,collection_data):
api_url,api_type = self.auth_verify(collection_data,'login')
logs.logging.info("API URL for login is : %s",api_url)
if api_url is None:
auth_response = input(self.api_logger.Y+"[-]Failed to detect login url. Do you want to contiune without authentication?(y/n):"+self.api_logger.W)
if auth_response == 'y' or auth_response == 'Y':
return
else:
sys.exit(1)

for data in collection_data:
if data['url'] == api_url:
url,method,headers,body = data['url'],data['method'],data['headers'],data['body']
if api_type == 'login':
if body:
body = json.loads(base64.b64decode(body))
login_token = self.fetch_logintoken(url,method,headers,body)
else:
login_token = self.fetch_logintoken(url,method,headers)

if login_token == True:
logs.logging.info("Login successfully : %s",url)
# calling auth_verify function to fetch logout request
logout_url, api_type = self.auth_verify(collection_data,"logout")
self.verify_logout(collection_data,logout_url)
auth_data = {'loginurl' : url, 'loginmethod' : method ,'loginheaders' : headers, 'loginbody' : body,'loginresult' : 'Y'}
for key,value in list(auth_data.items()):
update_value("login",key,value)
return

def parse_logindata(self,loginurl):
for data in self.parse_data.api_lst:
if loginurl == data['url']:
headers,method,body = data['headers'],data['method'],''
if data['body'] != '':
body = json.loads(base64.b64decode(data['body']))
return loginurl,headers,method,body
if any(name in url for name in api_types):
result = input(f"Is {api.capitalize()} URL correct? (y/n): {url} ")
if result.lower() == 'y':
return url, api
return None, None