Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions python_picnic_api/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .session import PicnicAPISession, PicnicAuthError
from hashlib import md5

from .helper import _tree_generator, _url_generator
from .session import PicnicAPISession, PicnicAuthError

DEFAULT_URL = "https://storefront-prod.{}.picnicinternational.com/api/{}"
DEFAULT_COUNTRY_CODE = "NL"
Expand All @@ -8,17 +10,19 @@

class PicnicAPI:
def __init__(
self, username: str, password: str, country_code: str = DEFAULT_COUNTRY_CODE
self, username: str = None, password: str = None,
country_code: str = DEFAULT_COUNTRY_CODE, auth_token: str = None
):
self._username = username
self._password = password
self._country_code = country_code
self._base_url = _url_generator(
DEFAULT_URL, self._country_code, DEFAULT_API_VERSION
)

self.session = PicnicAPISession()
self.session.login(self._username, self._password, self._base_url)
self.session = PicnicAPISession(auth_token=auth_token)

# Login if not authenticated
if not self.session.authenticated and username and password:
self.login(username, password)

def _get(self, path: str, add_picnic_headers=False):
url = self._base_url + path
Expand All @@ -40,13 +44,27 @@ def _post(self, path: str, data=None):
response = self.session.post(url, json=data).json()

if self._contains_auth_error(response):
raise PicnicAuthError("Picnic authentication error")
raise PicnicAuthError(f"Picnic authentication error: {response['error'].get('message')}")

return response

@staticmethod
def _contains_auth_error(response):
return isinstance(response, dict) and response.setdefault('error', {}).get('code') == 'AUTH_ERROR'
if not isinstance(response, dict):
return False

error_code = response.setdefault("error", {}).get("code")
return error_code == "AUTH_ERROR" or error_code == "AUTH_INVALID_CRED"

def login(self, username: str, password: str):
path = "/user/login"
secret = md5(password.encode("utf-8")).hexdigest()
data = {"key": username, "secret": secret, "client_id": 1}

return self._post(path, data)

def logged_in(self):
return self.session.authenticated

def get_user(self):
return self._get("/user")
Expand Down
49 changes: 30 additions & 19 deletions python_picnic_api/session.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,54 @@
from hashlib import md5
from requests import Session
from requests import Response, Session


class PicnicAuthError(Exception):
"""Indicates an error when authenticating to the Picnic API."""


class PicnicAPISession(Session):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
AUTH_HEADER = "x-picnic-auth"

def __init__(self, auth_token: str = None):
super().__init__()
self._auth_token = auth_token

self.headers.update(
{
"User-Agent": "okhttp/3.9.0",
"Content-Type": "application/json; charset=UTF-8",
self.AUTH_HEADER: self._auth_token
}
)

def login(self, username: str, password: str, base_url: str):
"""Login function for the Picnic API.
@property
def authenticated(self):
"""Returns whether the user is authenticated by checking if the authentication token is set."""
return bool(self._auth_token)

Args:
username (str): username, usualy your email.
password (str): password.
"""
@property
def auth_token(self):
"""Returns the auth token."""
return self._auth_token

if "x-picnic-auth" in self.headers:
self.headers.pop("x-picnic-auth", None)
def _update_auth_token(self, auth_token):
"""Update the auth token if not None and changed."""
if auth_token and auth_token != self._auth_token:
self._auth_token = auth_token
self.headers.update({self.AUTH_HEADER: self._auth_token})

url = base_url + "/user/login"
def get(self, url, **kwargs) -> Response:
"""Do a GET request and update the auth token if set."""
response = super(PicnicAPISession, self).get(url, **kwargs)
self._update_auth_token(response.headers.get(self.AUTH_HEADER))

secret = md5(password.encode("utf-8")).hexdigest()
data = {"key": username, "secret": secret, "client_id": 1}
return response

response = self.post(url, json=data)
if "x-picnic-auth" not in response.headers:
raise PicnicAuthError("Could not authenticate against Picnic API")
def post(self, url, data=None, json=None, **kwargs) -> Response:
"""Do a POST request and update the auth token if set."""
response = super(PicnicAPISession, self).post(url, data, json, **kwargs)
self._update_auth_token(response.headers.get(self.AUTH_HEADER))

self.headers.update({"x-picnic-auth": response.headers["x-picnic-auth"]})
return response


__all__ = ["PicnicAuthError", "PicnicAPISession"]
26 changes: 26 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ def setUp(self) -> None:
def tearDown(self) -> None:
self.session_patcher.stop()

def test_login_credentials(self):
self.session_mock().authenticated = False
PicnicAPI(username='[email protected]', password='test')
self.session_mock().post.assert_called_with(
self.expected_base_url + '/user/login',
json={'key': '[email protected]', 'secret': '098f6bcd4621d373cade4e832627b4f6', "client_id": 1}
)

def test_login_auth_token(self):
self.session_mock().authenticated = True
PicnicAPI(username='[email protected]', password='test', auth_token='a3fwo7f3h78kf3was7h8f3ahf3ah78f3')
self.session_mock().login.assert_not_called()

def test_login_failed(self):
response = {
"error": {
"code": "AUTH_INVALID_CRED",
"message": "Invalid credentials."
}
}
self.session_mock().post.return_value = self.MockResponse(response, 200)

client = PicnicAPI()
with self.assertRaises(PicnicAuthError):
client.login('test-user', 'test-password')

def test_get_user(self):
response = {
"user_id": "594-241-3623",
Expand Down
57 changes: 38 additions & 19 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
import unittest
from hashlib import md5
from unittest.mock import patch

from python_picnic_api.session import PicnicAPISession, PicnicAuthError
from requests import Session

from python_picnic_api.session import PicnicAPISession


class TestSession(unittest.TestCase):
class MockResponse:
def __init__(self, headers):
self.headers = headers

def setUp(self) -> None:
self.picnic_session = PicnicAPISession()

@patch.object(PicnicAPISession, 'post')
def test_login(self, post_mock):
@patch.object(Session, "post")
def test_update_auth_token(self, post_mock):
"""Test that the initial auth-token is saved."""
post_mock.return_value = self.MockResponse({
"x-picnic-auth": "3p9fqahw3uehfaw9fh8aw3ufaw389fpawhuo3fa"
})

self.picnic_session.login("[email protected]", "test-password", "https://picnic.app")

post_mock.assert_called_with(
'https://picnic.app/user/login',
json={"key": "[email protected]", "secret": md5("test-password".encode("utf-8")).hexdigest(), "client_id": 1}
)
self.assertDictEqual(dict(self.picnic_session.headers), {
picnic_session = PicnicAPISession()
picnic_session.post("https://picnic.app/user/login", json={"test": "data"})
self.assertDictEqual(dict(picnic_session.headers), {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
Expand All @@ -34,9 +29,33 @@ def test_login(self, post_mock):
"x-picnic-auth": "3p9fqahw3uehfaw9fh8aw3ufaw389fpawhuo3fa"
})

@patch.object(PicnicAPISession, 'post')
def test_login_failed(self, post_mock):
post_mock.return_value = self.MockResponse({})
@patch.object(Session, "post")
def test_update_auth_token_refresh(self, post_mock):
"""Test that the auth-token is updated if a new one is given in the response headers."""
post_mock.return_value = self.MockResponse({
"x-picnic-auth": "renewed-auth-token"
})

picnic_session = PicnicAPISession(auth_token="initial-auth-token")
self.assertEqual(picnic_session.auth_token, "initial-auth-token")

picnic_session.post("https://picnic.app", json={"test": "data"})
self.assertEqual(picnic_session.auth_token, "renewed-auth-token")

self.assertDictEqual(dict(picnic_session.headers), {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"User-Agent": "okhttp/3.9.0",
"Content-Type": "application/json; charset=UTF-8",
"x-picnic-auth": "renewed-auth-token"
})

def test_authenticated_with_auth_token(self):
picnic_session = PicnicAPISession(auth_token=None)
self.assertFalse(picnic_session.authenticated)
self.assertIsNone(picnic_session.headers[picnic_session.AUTH_HEADER])

with self.assertRaises(PicnicAuthError):
self.picnic_session.login('[email protected]', 'test-password', 'https://picnic.app')
picnic_session = PicnicAPISession(auth_token="3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i")
self.assertTrue(picnic_session.authenticated)
self.assertEqual(picnic_session.headers[picnic_session.AUTH_HEADER], "3p9aw8fhzsefaw29f38h7p3fwuefah37f8kwg3i")