Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Allow for saving the refreshed auth-token when present in the respons…
…e headers
  • Loading branch information
corneyl committed Mar 16, 2021
commit 1d313834f146000ada961b228e9cf225c6ab19dd
20 changes: 15 additions & 5 deletions python_picnic_api/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from hashlib import md5

from .helper import _tree_generator, _url_generator
from .session import PicnicAPISession, PicnicAuthError

Expand All @@ -19,7 +21,7 @@ def __init__(
self.session = PicnicAPISession(auth_token=auth_token)

# Login if not authenticated
if not self.session.authenticated() and username and password:
if not self.session.authenticated and username and password:
self.login(username, password)

def _get(self, path: str, add_picnic_headers=False):
Expand All @@ -42,19 +44,27 @@ def _post(self, path: str, data=None):
response = self.session.post(url, json=data).json()

if self._contains_auth_error(response):
raise PicnicAuthError("Picnic authentication error")
raise PicnicAuthError(f"Picnic authentication error: {response['error'].get('message')}")

return response

@staticmethod
def _contains_auth_error(response):
return isinstance(response, dict) and response.setdefault('error', {}).get('code') == 'AUTH_ERROR'
if not isinstance(response, dict):
return False

error_code = response.setdefault("error", {}).get("code")
return error_code == "AUTH_ERROR" or error_code == "AUTH_INVALID_CRED"

def login(self, username: str, password: str):
return self.session.login(username, password, self._base_url)
path = "/user/login"
secret = md5(password.encode("utf-8")).hexdigest()
data = {"key": username, "secret": secret, "client_id": 1}

return self._post(path, data)

def logged_in(self):
return self.session.authenticated()
return self.session.authenticated

def get_user(self):
return self._get("/user")
Expand Down
56 changes: 27 additions & 29 deletions python_picnic_api/session.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,54 @@
from hashlib import md5
from requests import Session
from requests import Response, Session


class PicnicAuthError(Exception):
"""Indicates an error when authenticating to the Picnic API."""


class PicnicAPISession(Session):

AUTH_HEADER = "x-picnic-auth"

def __init__(self, auth_token=None):
def __init__(self, auth_token: str = None):
super().__init__()
self.auth_token = auth_token
self._auth_token = auth_token

self.headers.update(
{
"User-Agent": "okhttp/3.9.0",
"Content-Type": "application/json; charset=UTF-8",
self.AUTH_HEADER: self.auth_token
self.AUTH_HEADER: self._auth_token
}
)

def login(self, username: str, password: str, base_url: str):
"""Login function for the Picnic API.

Args:
username (str): username, usually your email.
password (str): password.
base_url (str): The base url for doing requests
"""

if "x-picnic-auth" in self.headers:
self.headers.pop("x-picnic-auth", None)
@property
def authenticated(self):
"""Returns whether the user is authenticated by checking if the authentication token is set."""
return bool(self._auth_token)

url = base_url + "/user/login"
@property
def auth_token(self):
"""Returns the auth token."""
return self._auth_token

secret = md5(password.encode("utf-8")).hexdigest()
data = {"key": username, "secret": secret, "client_id": 1}
def _update_auth_token(self, auth_token):
"""Update the auth token if not None and changed."""
if auth_token and auth_token != self._auth_token:
self._auth_token = auth_token
self.headers.update({self.AUTH_HEADER: self._auth_token})

response = self.post(url, json=data)
if self.AUTH_HEADER not in response.headers:
raise PicnicAuthError("Could not authenticate against Picnic API")
def get(self, url, **kwargs) -> Response:
"""Do a GET request and update the auth token if set."""
response = super(PicnicAPISession, self).get(url, **kwargs)
self._update_auth_token(response.headers.get(self.AUTH_HEADER))

self.auth_token = response.headers[self.AUTH_HEADER]
self.headers.update({self.AUTH_HEADER: self.auth_token})
return response

return self.auth_token
def post(self, url, data=None, json=None, **kwargs) -> Response:
"""Do a POST request and update the auth token if set."""
response = super(PicnicAPISession, self).post(url, data, json, **kwargs)
self._update_auth_token(response.headers.get(self.AUTH_HEADER))

def authenticated(self):
"""Returns if the user is authenticated by checking if the authentication token is set."""
return bool(self.auth_token)
return response


__all__ = ["PicnicAuthError", "PicnicAPISession"]
30 changes: 23 additions & 7 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,31 @@ def tearDown(self) -> None:
self.session_patcher.stop()

def test_login_credentials(self):
self.session_mock().authenticated.return_value = False
self.session_mock().authenticated = False
PicnicAPI(username='[email protected]', password='test')
self.session_mock().login.assert_called_with('[email protected]', 'test', self.expected_base_url)
self.session_mock().post.assert_called_with(
self.expected_base_url + '/user/login',
json={'key': '[email protected]', 'secret': '098f6bcd4621d373cade4e832627b4f6', "client_id": 1}
)

def test_login_auth_token(self):
self.session_mock().authenticated.return_value = True
self.session_mock().authenticated = True
PicnicAPI(username='[email protected]', password='test', auth_token='a3fwo7f3h78kf3was7h8f3ahf3ah78f3')
self.session_mock().login.assert_not_called()

def test_login_failed(self):
response = {
"error": {
"code": "AUTH_INVALID_CRED",
"message": "Invalid credentials."
}
}
self.session_mock().post.return_value = self.MockResponse(response, 200)

client = PicnicAPI()
with self.assertRaises(PicnicAuthError):
client.login('test-user', 'test-password')

def test_get_user(self):
response = {
"user_id": "594-241-3623",
Expand Down Expand Up @@ -151,10 +167,10 @@ def test_get_categories(self):
self.session_mock().get.assert_called_with(self.expected_base_url + '/my_store?depth=0', headers=None)

self.assertDictEqual(categories[0], {
"type": "CATEGORY",
"id": "purchases",
"name": "Besteld",
})
"type": "CATEGORY",
"id": "purchases",
"name": "Besteld",
})

def test_get_auth_exception(self):
self.session_mock().get.return_value = self.MockResponse({
Expand Down
56 changes: 33 additions & 23 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
import unittest
from hashlib import md5
from unittest.mock import patch

from python_picnic_api.session import PicnicAPISession, PicnicAuthError
from requests import Session

from python_picnic_api.session import PicnicAPISession


class TestSession(unittest.TestCase):
class MockResponse:
def __init__(self, headers):
self.headers = headers

def setUp(self) -> None:
self.picnic_session = PicnicAPISession()

@patch.object(PicnicAPISession, 'post')
def test_login(self, post_mock):
@patch.object(Session, "post")
def test_update_auth_token(self, post_mock):
"""Test that the initial auth-token is saved."""
post_mock.return_value = self.MockResponse({
"x-picnic-auth": "3p9fqahw3uehfaw9fh8aw3ufaw389fpawhuo3fa"
})

self.picnic_session.login("[email protected]", "test-password", "https://picnic.app")

post_mock.assert_called_with(
'https://picnic.app/user/login',
json={"key": "[email protected]", "secret": md5("test-password".encode("utf-8")).hexdigest(), "client_id": 1}
)
self.assertDictEqual(dict(self.picnic_session.headers), {
picnic_session = PicnicAPISession()
picnic_session.post("https://picnic.app/user/login", json={"test": "data"})
self.assertDictEqual(dict(picnic_session.headers), {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
Expand All @@ -34,18 +29,33 @@ def test_login(self, post_mock):
"x-picnic-auth": "3p9fqahw3uehfaw9fh8aw3ufaw389fpawhuo3fa"
})

@patch.object(PicnicAPISession, 'post')
def test_login_failed(self, post_mock):
post_mock.return_value = self.MockResponse({})
@patch.object(Session, "post")
def test_update_auth_token_refresh(self, post_mock):
"""Test that the auth-token is updated if a new one is given in the response headers."""
post_mock.return_value = self.MockResponse({
"x-picnic-auth": "renewed-auth-token"
})

picnic_session = PicnicAPISession(auth_token="initial-auth-token")
self.assertEqual(picnic_session.auth_token, "initial-auth-token")

picnic_session.post("https://picnic.app", json={"test": "data"})
self.assertEqual(picnic_session.auth_token, "renewed-auth-token")

with self.assertRaises(PicnicAuthError):
self.picnic_session.login('[email protected]', 'test-password', 'https://picnic.app')
self.assertDictEqual(dict(picnic_session.headers), {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"User-Agent": "okhttp/3.9.0",
"Content-Type": "application/json; charset=UTF-8",
"x-picnic-auth": "renewed-auth-token"
})

def test_authenticated_with_auth_token(self):
picnic_session = PicnicAPISession(auth_token=None)
self.assertFalse(picnic_session.authenticated())
self.assertFalse(picnic_session.authenticated)
self.assertIsNone(picnic_session.headers[picnic_session.AUTH_HEADER])

picnic_session = PicnicAPISession(auth_token='3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i')
self.assertTrue(picnic_session.authenticated())
self.assertEqual(picnic_session.headers[picnic_session.AUTH_HEADER], '3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i')
picnic_session = PicnicAPISession(auth_token="3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i")
self.assertTrue(picnic_session.authenticated)
self.assertEqual(picnic_session.headers[picnic_session.AUTH_HEADER], "3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i")