diff --git a/examples/auth/interactive.py b/examples/auth/interactive.py index 290a8995..01270416 100644 --- a/examples/auth/interactive.py +++ b/examples/auth/interactive.py @@ -16,3 +16,5 @@ client = GraphClient.with_token_interactive(test_tenant, test_client_id) me = client.me.get().execute_query() print("Welcome, {0}!".format(me.given_name)) +site = client.sites.root.get().execute_query() +print("Site Url: {0}!".format(site.web_url)) diff --git a/examples/auth/with_user_creds.py b/examples/auth/with_user_creds.py index ac1ec80b..5fb2e5f9 100644 --- a/examples/auth/with_user_creds.py +++ b/examples/auth/with_user_creds.py @@ -4,26 +4,11 @@ https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication """ -import msal - from office365.graph_client import GraphClient -from tests import test_client_id, test_tenant, test_user_credentials - - -def acquire_token(): - authority_url = "https://login.microsoftonline.com/{0}".format(test_tenant) - app = msal.PublicClientApplication( - authority=authority_url, client_id=test_client_id - ) - - result = app.acquire_token_by_username_password( - username=test_user_credentials.userName, - password=test_user_credentials.password, - scopes=["https://graph.microsoft.com/.default"], - ) - return result - +from tests import test_client_id, test_password, test_tenant, test_username -client = GraphClient(acquire_token) +client = GraphClient.with_username_and_password( + test_tenant, test_client_id, test_username, test_password +) me = client.me.get().execute_query() print(me.user_principal_name) diff --git a/examples/auth/with_user_creds_custom.py b/examples/auth/with_user_creds_custom.py new file mode 100644 index 00000000..ac1ec80b --- /dev/null +++ b/examples/auth/with_user_creds_custom.py @@ -0,0 +1,29 @@ +""" +Username Password Authentication flow + +https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication +""" + +import msal + +from office365.graph_client import GraphClient +from tests import test_client_id, test_tenant, test_user_credentials + + +def acquire_token(): + authority_url = "https://login.microsoftonline.com/{0}".format(test_tenant) + app = msal.PublicClientApplication( + authority=authority_url, client_id=test_client_id + ) + + result = app.acquire_token_by_username_password( + username=test_user_credentials.userName, + password=test_user_credentials.password, + scopes=["https://graph.microsoft.com/.default"], + ) + return result + + +client = GraphClient(acquire_token) +me = client.me.get().execute_query() +print(me.user_principal_name) diff --git a/office365/graph_client.py b/office365/graph_client.py index 44c4d15e..7893eb82 100644 --- a/office365/graph_client.py +++ b/office365/graph_client.py @@ -55,7 +55,7 @@ from office365.outlook.calendar.rooms.list import RoomList from office365.planner.planner import Planner from office365.reports.root import ReportRoot -from office365.runtime.auth.token_response import TokenResponse +from office365.runtime.auth.entra.authentication_context import AuthenticationContext from office365.runtime.client_runtime_context import ClientRuntimeContext from office365.runtime.http.http_method import HttpMethod from office365.runtime.http.request_options import RequestOptions @@ -77,11 +77,16 @@ class GraphClient(ClientRuntimeContext): """Graph Service client""" - def __init__(self, acquire_token_callback): - # type: (Callable[[], dict]) -> None + def __init__(self, acquire_token_callback=None, auth_context=None): + # type: (Callable[[], dict], AuthenticationContext) -> None super(GraphClient, self).__init__() self._pending_request = None - self._acquire_token_callback = acquire_token_callback + if acquire_token_callback is not None: + self._auth_context = AuthenticationContext().with_access_token( + acquire_token_callback + ) + else: + self._auth_context = auth_context @staticmethod def with_certificate( @@ -98,27 +103,10 @@ def with_certificate( :param Any token_cache: Default cache is in memory only, Refer https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache """ - if scopes is None: - scopes = ["https://graph.microsoft.com/.default"] - authority_url = "https://login.microsoftonline.com/{0}".format(tenant) - import msal - - app = msal.ConfidentialClientApplication( - client_id, - authority=authority_url, - client_credential={ - "thumbprint": thumbprint, - "private_key": private_key, - }, - token_cache=token_cache, # Default cache is in memory only. - # You can learn how to use SerializableTokenCache from - # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache - ) - - def _acquire_token(): - return app.acquire_token_for_client(scopes=scopes) - - return GraphClient(_acquire_token) + auth_ctx = AuthenticationContext( + tenant=tenant, scopes=scopes, token_cache=token_cache + ).with_certificate(client_id, thumbprint, private_key) + return GraphClient(auth_context=auth_ctx) @staticmethod def with_client_secret( @@ -135,22 +123,11 @@ def with_client_secret( :param Any token_cache: Default cache is in memory only, Refer https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache """ - if scopes is None: - scopes = ["https://graph.microsoft.com/.default"] - authority_url = "https://login.microsoftonline.com/{0}".format(tenant) - import msal - - app = msal.ConfidentialClientApplication( - client_id, - authority=authority_url, - client_credential=client_secret, - token_cache=token_cache, - ) - def _acquire_token(): - return app.acquire_token_for_client(scopes=scopes) - - return GraphClient(_acquire_token) + auth_ctx = AuthenticationContext( + tenant=tenant, scopes=scopes, token_cache=token_cache + ).with_client_secret(client_id, client_secret) + return GraphClient(auth_context=auth_ctx) @staticmethod def with_token_interactive(tenant, client_id, username=None, scopes=None): @@ -164,32 +141,10 @@ def with_token_interactive(tenant, client_id, username=None, scopes=None): :param str username: Typically a UPN in the form of an email address. :param list[str] or None scopes: Scopes requested to access an API """ - if scopes is None: - scopes = ["https://graph.microsoft.com/.default"] - authority_url = "https://login.microsoftonline.com/{0}".format(tenant) - import msal - - app = msal.PublicClientApplication(client_id, authority=authority_url) - - def _acquire_token(): - # The pattern to acquire a token looks like this. - result = None - - # Firstly, check the cache to see if this end user has signed in before - accounts = app.get_accounts(username=username) - if accounts: - chosen = accounts[0] # Assuming the end user chose this one to proceed - # Now let's try to find a token in cache for this account - result = app.acquire_token_silent(scopes, account=chosen) - - if not result: - result = app.acquire_token_interactive( - scopes, - login_hint=username, - ) - return result - - return GraphClient(_acquire_token) + auth_ctx = AuthenticationContext( + tenant=tenant, scopes=scopes + ).with_token_interactive(client_id, username) + return GraphClient(auth_context=auth_ctx) @staticmethod def with_username_and_password(tenant, client_id, username, password, scopes=None): @@ -203,31 +158,10 @@ def with_username_and_password(tenant, client_id, username, password, scopes=Non :param str password: The password. :param list[str] or None scopes: Scopes requested to access an API """ - if scopes is None: - scopes = ["https://graph.microsoft.com/.default"] - authority_url = "https://login.microsoftonline.com/{0}".format(tenant) - import msal - - app = msal.PublicClientApplication( - authority=authority_url, - client_id=client_id, - ) - - def _acquire_token(): - result = None - accounts = app.get_accounts(username=username) - if accounts: - result = app.acquire_token_silent(scopes, account=accounts[0]) - - if not result: - result = app.acquire_token_by_username_password( - username=username, - password=password, - scopes=scopes, - ) - return result - - return GraphClient(_acquire_token) + auth_ctx = AuthenticationContext( + tenant=tenant, scopes=scopes + ).with_username_and_password(client_id, username, password) + return GraphClient(auth_context=auth_ctx) def execute_batch(self, items_per_batch=20, success_callback=None): """Constructs and submit a batch request @@ -238,7 +172,7 @@ def execute_batch(self, items_per_batch=20, success_callback=None): :param (List[ClientObject|ClientResult])-> None success_callback: A success callback """ batch_request = ODataV4BatchRequest(V4JsonFormat()) - batch_request.beforeExecute += self._authenticate_request + batch_request.beforeExecute += self._auth_context.authenticate_request while self.has_pending_request: qry = self._get_next_query(items_per_batch) batch_request.execute_query(qry) @@ -250,7 +184,9 @@ def pending_request(self): # type: () -> GraphRequest if self._pending_request is None: self._pending_request = GraphRequest() - self._pending_request.beforeExecute += self._authenticate_request + self._pending_request.beforeExecute += ( + self._auth_context.authenticate_request + ) self._pending_request.beforeExecute += self._build_specific_query return self._pending_request @@ -266,13 +202,6 @@ def _build_specific_query(self, request): elif isinstance(self.current_query, DeleteEntityQuery): request.method = HttpMethod.Delete - def _authenticate_request(self, request): - # type: (RequestOptions) -> None - """Authenticate request.""" - token_json = self._acquire_token_callback() - token = TokenResponse.from_json(token_json) - request.ensure_header("Authorization", "Bearer {0}".format(token.accessToken)) - @property def admin(self): """A container for administrator functionality for SharePoint and OneDrive.""" diff --git a/office365/runtime/auth/entra/__init__.py b/office365/runtime/auth/entra/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/office365/runtime/auth/entra/authentication_context.py b/office365/runtime/auth/entra/authentication_context.py new file mode 100644 index 00000000..2ccf15d0 --- /dev/null +++ b/office365/runtime/auth/entra/authentication_context.py @@ -0,0 +1,150 @@ +from typing import Optional + +from office365.runtime.auth.token_response import TokenResponse +from office365.runtime.http.request_options import RequestOptions + + +class AuthenticationContext(object): + """Provides authentication context for Microsoft Graph client""" + + def __init__(self, tenant=None, scopes=None, token_cache=None): + """ + :param str tenant: Tenant name, for example: contoso.onmicrosoft.com + :param list[str] or None scopes: Scopes requested to access an API + :param Any token_cache: Default cache is in memory only, + Refer https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache + """ + if tenant is not None: + self._authority_url = "https://login.microsoftonline.com/{0}".format(tenant) + if scopes is None: + scopes = ["https://graph.microsoft.com/.default"] + self._scopes = scopes + self._token_cache = token_cache + self._token_callback = None + + def with_access_token(self, token_callback): + self._token_callback = token_callback + return self + + def authenticate_request(self, request): + # type: (RequestOptions) -> None + """Authenticate request.""" + if not self._token_callback: + raise ValueError("Token callback is not set.") + token_json = self._token_callback() + token = TokenResponse.from_json(token_json) + request.ensure_header("Authorization", "Bearer {0}".format(token.accessToken)) + + def with_certificate(self, client_id, thumbprint, private_key): + """ + Initializes the confidential client with client certificate + + :param str client_id: The OAuth client id of the calling application. + :param str thumbprint: Thumbprint + :param str private_key: Private key + """ + import msal + + app = msal.ConfidentialClientApplication( + client_id, + authority=self._authority_url, + client_credential={ + "thumbprint": thumbprint, + "private_key": private_key, + }, + token_cache=self._token_cache, # Default cache is in memory only. + # You can learn how to use SerializableTokenCache from + # https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache + ) + + def _acquire_token(): + return app.acquire_token_for_client(scopes=self._scopes) + + return self.with_access_token(_acquire_token) + + def with_client_secret(self, client_id, client_secret): + # type: (str, str) -> "AuthenticationContext" + """ + Initializes the confidential client with client secret + + :param str client_id: The OAuth client id of the calling application. + :param str client_secret: Client secret + """ + import msal + + app = msal.ConfidentialClientApplication( + client_id, + authority=self._authority_url, + client_credential=client_secret, + token_cache=self._token_cache, + ) + + def _acquire_token(): + return app.acquire_token_for_client(scopes=self._scopes) + + return self.with_access_token(_acquire_token) + + def with_token_interactive(self, client_id, username=None): + # type: (str, Optional[str]) -> "AuthenticationContext" + """ + Initializes the client via user credentials + Note: only works if your app is registered with redirect_uri as http://localhost + + :param str client_id: The OAuth client id of the calling application. + :param str username: Typically a UPN in the form of an email address. + """ + import msal + + app = msal.PublicClientApplication(client_id, authority=self._authority_url) + + def _acquire_token(): + # The pattern to acquire a token looks like this. + result = None + + # Firstly, check the cache to see if this end user has signed in before + accounts = app.get_accounts(username=username) + if accounts: + chosen = accounts[0] # Assuming the end user chose this one to proceed + # Now let's try to find a token in cache for this account + result = app.acquire_token_silent(self._scopes, account=chosen) + + if not result: + result = app.acquire_token_interactive( + self._scopes, + login_hint=username, + ) + return result + + return self.with_access_token(_acquire_token) + + def with_username_and_password(self, client_id, username, password): + # type: (str, str, str) -> "AuthenticationContext" + """ + Initializes the client via user credentials + + :param str client_id: The OAuth client id of the calling application. + :param str username: Typically a UPN in the form of an email address. + :param str password: The password. + """ + import msal + + app = msal.PublicClientApplication( + authority=self._authority_url, + client_id=client_id, + ) + + def _acquire_token(): + result = None + accounts = app.get_accounts(username=username) + if accounts: + result = app.acquire_token_silent(self._scopes, account=accounts[0]) + + if not result: + result = app.acquire_token_by_username_password( + username=username, + password=password, + scopes=self._scopes, + ) + return result + + return self.with_access_token(_acquire_token) diff --git a/office365/teams/deleted.py b/office365/teams/deleted.py new file mode 100644 index 00000000..6ad43865 --- /dev/null +++ b/office365/teams/deleted.py @@ -0,0 +1,24 @@ +from office365.entity import Entity +from office365.runtime.paths.resource_path import ResourcePath +from office365.teams.channels.collection import ChannelCollection + + +class DeletedTeam(Entity): + """ + A deleted team in Microsoft Teams is a collection of channel objects. A channel represents a topic, + and therefore a logical isolation of discussion, within a deleted team. + + Every deleted team is associated with a Microsoft 365 group. For more information about working with groups + and members in teams, see Use the Microsoft Graph REST API to work with Microsoft Teams. + """ + + @property + def channels(self): + # type: () -> ChannelCollection + """The collection of channels & messages associated with the team.""" + return self.properties.get( + "channels", + ChannelCollection( + self.context, ResourcePath("channels", self.resource_path) + ), + ) diff --git a/office365/teams/work.py b/office365/teams/work.py new file mode 100644 index 00000000..9a59c145 --- /dev/null +++ b/office365/teams/work.py @@ -0,0 +1,35 @@ +from typing import Optional + +from office365.entity import Entity +from office365.entity_collection import EntityCollection +from office365.runtime.paths.resource_path import ResourcePath +from office365.teams.deleted import DeletedTeam + + +class Teamwork(Entity): + """A container for the range of Microsoft Teams functionalities that are available for the organization.""" + + @property + def is_teams_enabled(self): + # type: () -> Optional[bool] + """Indicates whether Microsoft Teams is enabled for the organization.""" + return self.properties.get("isTeamsEnabled", None) + + @property + def region(self): + # type: () -> Optional[str] + """Represents the region of the organization or the tenant. + The region value can be any region supported by the Teams payload""" + return self.properties.get("region", None) + + @property + def deleted_teams(self): + """The tags associated with the team.""" + return self.properties.get( + "deletedTeams", + EntityCollection( + self.context, + DeletedTeam, + ResourcePath("deletedTeams", self.resource_path), + ), + ) diff --git a/tests/onedrive/test_excel_charts.py b/tests/onedrive/test_excel_charts.py index b4ec0044..839948e9 100644 --- a/tests/onedrive/test_excel_charts.py +++ b/tests/onedrive/test_excel_charts.py @@ -1,3 +1,5 @@ +import os + from office365.onedrive.driveitems.driveItem import DriveItem from office365.onedrive.workbooks.charts.chart import WorkbookChart from office365.onedrive.workbooks.worksheets.worksheet import WorkbookWorksheet @@ -12,7 +14,9 @@ class TestExcelCharts(GraphTestCase): @classmethod def setUpClass(cls): super(TestExcelCharts, cls).setUpClass() - path = "../../examples/data/templates/Weight loss tracker.xlsx" + path = "{0}/../../examples/data/templates/Weight loss tracker.xlsx".format( + os.path.dirname(__file__) + ) cls.excel_file = cls.client.me.drive.root.upload_file(path).execute_query() assert cls.excel_file.resource_path is not None cls.worksheet = ( diff --git a/tests/onedrive/test_excel_tables.py b/tests/onedrive/test_excel_tables.py index 0cdc5452..408d3a34 100644 --- a/tests/onedrive/test_excel_tables.py +++ b/tests/onedrive/test_excel_tables.py @@ -1,3 +1,5 @@ +import os + from examples.sharepoint.lists.assessment.broken_tax_field_value import fields from office365.onedrive.driveitems.driveItem import DriveItem from office365.onedrive.workbooks.sort_field import WorkbookSortField @@ -14,7 +16,7 @@ class TestExcelTables(GraphTestCase): @classmethod def setUpClass(cls): super(TestExcelTables, cls).setUpClass() - path = "../data/Financial Sample.xlsx" + path = "{0}/../data/Financial Sample.xlsx".format(os.path.dirname(__file__)) cls.excel_file = cls.client.me.drive.root.upload_file(path).execute_query() assert cls.excel_file.resource_path is not None cls.worksheet = (