Merge branch 'master' into proxy-to-alpha

This commit is contained in:
Martyn Inglis
2016-01-19 13:49:54 +00:00
21 changed files with 480 additions and 276 deletions

View File

@@ -1,6 +1,6 @@
from client.authentication import create_jwt_token
from app.dao.tokens_dao import get_unsigned_token
from app.dao.api_key_dao import get_unsigned_secret
def create_authorization_header(service_id, path, method, request_body=None):
@@ -8,14 +8,14 @@ def create_authorization_header(service_id, path, method, request_body=None):
token = create_jwt_token(
request_method=method,
request_path=path,
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id,
request_body=request_body)
else:
token = create_jwt_token(request_method=method,
request_path=path,
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id)
return 'Authorization', 'Bearer {}'.format(token)

View File

@@ -1,7 +1,7 @@
from client.authentication import create_jwt_token
from flask import json, url_for
from app.dao.tokens_dao import get_unsigned_token
from app.dao.api_key_dao import get_unsigned_secret
def test_should_not_allow_request_with_no_token(notify_api):
@@ -33,13 +33,13 @@ def test_should_not_allow_request_with_incorrect_token(notify_api):
assert data['error'] == 'Invalid token: signature'
def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = create_jwt_token(request_method="GET",
request_path="/bad",
secret=get_unsigned_token(sample_token.service_id),
client_id=sample_token.service_id)
secret=get_unsigned_secret(sample_api_key.service_id),
client_id=sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -47,10 +47,10 @@ def test_should_not_allow_incorrect_path(notify_api, notify_db, notify_db_sessio
assert data['error'] == 'Invalid token: request'
def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, {})
token = __create_post_token(sample_api_key.service_id, {})
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -58,11 +58,11 @@ def test_should_not_allow_incorrect_method(notify_api, notify_db, notify_db_sess
assert data['error'] == 'Invalid token: request'
def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret",
client_id=sample_token.service_id)
client_id=sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': "Bearer {}".format(token)})
assert response.status_code == 403
@@ -70,10 +70,10 @@ def test_should_not_allow_invalid_secret(notify_api, notify_db, notify_db_sessio
assert data['error'] == 'Invalid token: signature'
def test_should_allow_valid_token(notify_api, notify_db, notify_db_session, sample_token):
def test_should_allow_valid_token(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_get_token(sample_token.service_id)
token = __create_get_token(sample_api_key.service_id)
response = client.get(url_for('status.show_status'),
headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 200
@@ -86,20 +86,20 @@ JSON_BODY = json.dumps({
})
def test_should_allow_valid_token_with_post_body(notify_api, notify_db, notify_db_session, sample_token):
def test_should_allow_valid_token_with_post_body(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, JSON_BODY)
token = __create_post_token(sample_api_key.service_id, JSON_BODY)
response = client.post(url_for('status.show_status'),
data=JSON_BODY,
headers={'Authorization': 'Bearer {}'.format(token)})
assert response.status_code == 200
def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_db, notify_db_session, sample_token):
def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_db, notify_db_session, sample_api_key):
with notify_api.test_request_context():
with notify_api.test_client() as client:
token = __create_post_token(sample_token.service_id, JSON_BODY)
token = __create_post_token(sample_api_key.service_id, JSON_BODY)
response = client.post(url_for('status.show_status'),
data="spurious",
headers={'Authorization': 'Bearer {}'.format(token)})
@@ -111,7 +111,7 @@ def test_should_not_allow_valid_token_with_invalid_post_body(notify_api, notify_
def __create_get_token(service_id):
return create_jwt_token(request_method="GET",
request_path=url_for('status.show_status'),
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id)
@@ -119,7 +119,7 @@ def __create_post_token(service_id, request_body):
return create_jwt_token(
request_method="POST",
request_path=url_for('status.show_status'),
secret=get_unsigned_token(service_id),
secret=get_unsigned_secret(service_id),
client_id=service_id,
request_body=request_body
)

View File

@@ -1,9 +1,9 @@
import pytest
from app.models import (User, Service, Template, Token, Job)
from app.models import (User, Service, Template, ApiKey, Job)
from app.dao.users_dao import (save_model_user)
from app.dao.services_dao import save_model_service
from app.dao.templates_dao import save_model_template
from app.dao.tokens_dao import save_model_token
from app.dao.api_key_dao import save_model_api_key
from app.dao.jobs_dao import save_job
import uuid
@@ -12,7 +12,14 @@ import uuid
def sample_user(notify_db,
notify_db_session,
email="notify@digital.cabinet-office.gov.uk"):
user = User(**{'email_address': email})
data = {
'name': 'Test User',
'email_address': email,
'password': 'password',
'mobile_number': '+44 7700 900986',
'state': 'active'
}
user = User(**data)
save_model_user(user)
return user
@@ -44,7 +51,7 @@ def sample_template(notify_db,
service=None):
if service is None:
service = sample_service(notify_db, notify_db_session)
sample_token(notify_db, notify_db_session, service=service)
sample_api_key(notify_db, notify_db_session, service=service)
data = {
'name': template_name,
'template_type': template_type,
@@ -57,15 +64,15 @@ def sample_template(notify_db,
@pytest.fixture(scope='function')
def sample_token(notify_db,
notify_db_session,
service=None):
def sample_api_key(notify_db,
notify_db_session,
service=None):
if service is None:
service = sample_service(notify_db, notify_db_session)
data = {'service_id': service.id}
token = Token(**data)
save_model_token(token)
return token
data = {'service_id': service.id, 'name': service.name}
api_key = ApiKey(**data)
save_model_api_key(api_key)
return api_key
@pytest.fixture(scope='function')
@@ -98,7 +105,7 @@ def sample_job(notify_db,
def sample_admin_service_id(notify_db, notify_db_session):
admin_user = sample_user(notify_db, notify_db_session, email="notify_admin@digital.cabinet-office.gov.uk")
admin_service = sample_service(notify_db, notify_db_session, service_name="Sample Admin Service", user=admin_user)
data = {'service_id': admin_service.id}
token = Token(**data)
save_model_token(token)
data = {'service': admin_service, 'name': 'sample admin key'}
api_key = ApiKey(**data)
save_model_api_key(api_key)
return admin_service.id

View File

@@ -0,0 +1,69 @@
from datetime import datetime
from pytest import fail
from sqlalchemy.orm.exc import NoResultFound
from app.dao.api_key_dao import (save_model_api_key,
get_model_api_keys,
get_unsigned_secret,
_generate_secret,
_get_secret)
from app.models import ApiKey
def test_secret_is_signed_and_can_be_read_again(notify_api):
import uuid
with notify_api.test_request_context():
token = str(uuid.uuid4())
signed_secret = _generate_secret(token=token)
assert token == _get_secret(signed_secret)
assert signed_secret != token
def test_save_api_key_should_create_new_api_key(notify_api, notify_db, notify_db_session, sample_service):
api_key = ApiKey(**{'service_id': sample_service.id, 'name': sample_service.name})
save_model_api_key(api_key)
all_api_keys = get_model_api_keys()
assert len(all_api_keys) == 1
assert all_api_keys[0] == api_key
def test_save_api_key_should_update_the_api_key(notify_api, notify_db, notify_db_session, sample_api_key):
now = datetime.utcnow()
saved_api_key = get_model_api_keys(sample_api_key.service_id)
save_model_api_key(saved_api_key, update_dict={'id': saved_api_key.id, 'expiry_date': now})
all_api_keys = get_model_api_keys()
assert len(all_api_keys) == 1
assert all_api_keys[0].expiry_date == now
assert all_api_keys[0].secret == saved_api_key.secret
assert all_api_keys[0].id == saved_api_key.id
assert all_api_keys[0].service_id == saved_api_key.service_id
def test_get_api_key_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
try:
get_model_api_keys(sample_service.id)
fail("Should have thrown a NoResultFound exception")
except NoResultFound:
pass
def test_get_api_key_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
assert get_model_api_keys(service_id=sample_service.id, raise_=False) is None
def test_should_return_api_key_for_service(notify_api, notify_db, notify_db_session, sample_api_key):
api_key = get_model_api_keys(sample_api_key.service_id)
assert api_key == sample_api_key
def test_should_return_unsigned_api_key_for_service_id(notify_api,
notify_db,
notify_db_session,
sample_api_key):
unsigned_api_key = get_unsigned_secret(sample_api_key.service_id)
assert sample_api_key.secret != unsigned_api_key
assert unsigned_api_key == _get_secret(sample_api_key.secret)

View File

@@ -1,62 +0,0 @@
import uuid
from app.dao.tokens_dao import (save_model_token, get_model_tokens, get_unsigned_token, _generate_token, _get_token)
from datetime import datetime
from app.models import Token
from pytest import fail
from sqlalchemy.orm.exc import NoResultFound
def test_token_is_signed_and_can_be_read_again(notify_api):
import uuid
with notify_api.test_request_context():
token = str(uuid.uuid4())
signed_token = _generate_token(token=token)
assert token == _get_token(signed_token)
assert signed_token != token
def test_save_token_should_create_new_token(notify_api, notify_db, notify_db_session, sample_service):
api_token = Token(**{'service_id': sample_service.id})
save_model_token(api_token)
all_tokens = get_model_tokens()
assert len(all_tokens) == 1
assert all_tokens[0] == api_token
def test_save_token_should_update_the_token(notify_api, notify_db, notify_db_session, sample_token):
now = datetime.utcnow()
saved_token = get_model_tokens(sample_token.service_id)
save_model_token(saved_token, update_dict={'id': saved_token.id, 'expiry_date': now})
all_tokens = get_model_tokens()
assert len(all_tokens) == 1
assert all_tokens[0].expiry_date == now
assert all_tokens[0].token == saved_token.token
assert all_tokens[0].id == saved_token.id
assert all_tokens[0].service_id == saved_token.service_id
def test_get_token_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
try:
get_model_tokens(sample_service.id)
fail("Should have thrown a NoResultFound exception")
except NoResultFound:
pass
def test_get_token_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service):
assert get_model_tokens(service_id=sample_service.id, raise_=False) is None
def test_should_return_token_for_service(notify_api, notify_db, notify_db_session, sample_token):
token = get_model_tokens(sample_token.service_id)
assert token == sample_token
def test_should_return_unsigned_token_for_service_id(notify_api, notify_db, notify_db_session,
sample_token):
unsigned_token = get_unsigned_token(sample_token.service_id)
assert sample_token.token != unsigned_token
assert unsigned_token == _get_token(sample_token.token)

View File

@@ -8,7 +8,13 @@ from app.models import User
def test_create_user(notify_api, notify_db, notify_db_session):
email = 'notify@digital.cabinet-office.gov.uk'
user = User(**{'email_address': email})
data = {
'name': 'Test User',
'email_address': email,
'password': 'password',
'mobile_number': '+44 7700 900986'
}
user = User(**data)
save_model_user(user)
assert User.query.count() == 1
assert User.query.first().email_address == email

View File

@@ -1,7 +1,7 @@
import json
from flask import url_for
from app.dao.services_dao import save_model_service
from app.models import (Service, Token, Template)
from app.models import (Service, ApiKey, Template)
from tests import create_authorization_header
from tests.app.conftest import sample_user as create_sample_user
@@ -70,7 +70,6 @@ def test_post_service(notify_api, notify_db, notify_db_session, sample_user, sam
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data']['name'] == service.name
assert json_resp['data']['limit'] == service.limit
assert json_resp['token'] is not None
def test_post_service_multiple_users(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
@@ -311,76 +310,84 @@ def test_delete_service_not_exists(notify_api, notify_db, notify_db_session, sam
assert Service.query.count() == 2
def test_renew_token_should_return_token_when_service_does_not_have_a_valid_token(notify_api, notify_db,
notify_db_session,
sample_service,
sample_admin_service_id):
def test_renew_api_key_should_create_new_api_key_for_service(notify_api, notify_db,
notify_db_session,
sample_service,
sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = {'name': 'some secret name'}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token', service_id=sample_service.id),
method='POST')
response = client.post(url_for('service.renew_token', service_id=sample_service.id),
path=url_for('service.renew_api_key',
service_id=sample_service.id),
method='POST',
request_body=json.dumps(data))
response = client.post(url_for('service.renew_api_key', service_id=sample_service.id),
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 201
assert response.get_data is not None
saved_token = Token.query.filter_by(service_id=sample_service.id).first()
assert saved_token.service_id == sample_service.id
saved_api_key = ApiKey.query.filter_by(service_id=sample_service.id).first()
assert saved_api_key.service_id == sample_service.id
assert saved_api_key.name == 'some secret name'
def test_renew_token_should_expire_the_old_token_and_create_a_new_token(notify_api, notify_db, notify_db_session,
sample_token, sample_admin_service_id):
def test_renew_api_key_should_expire_the_old_api_key_and_create_a_new_api_key(notify_api, notify_db, notify_db_session,
sample_api_key, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Token.query.count() == 2
assert ApiKey.query.count() == 2
data = {'name': 'some secret name'}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token',
service_id=sample_token.service_id),
method='POST')
path=url_for('service.renew_api_key',
service_id=sample_api_key.service_id),
method='POST',
request_body=json.dumps(data))
response = client.post(url_for('service.renew_token', service_id=sample_token.service_id),
response = client.post(url_for('service.renew_api_key', service_id=sample_api_key.service_id),
data=json.dumps(data),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 201
assert Token.query.count() == 3
all_tokens = Token.query.filter_by(service_id=sample_token.service_id).all()
for x in all_tokens:
if x.id == sample_token.id:
assert ApiKey.query.count() == 3
all_api_keys = ApiKey.query.filter_by(service_id=sample_api_key.service_id).all()
for x in all_api_keys:
if x.id == sample_api_key.id:
assert x.expiry_date is not None
else:
assert x.expiry_date is None
assert x.token is not sample_token.token
assert x.secret is not sample_api_key.secret
def test_create_token_should_return_error_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service, sample_admin_service_id):
def test_renew_api_key_should_return_error_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
sample_service, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.renew_token', service_id="123"),
path=url_for('service.renew_api_key', service_id="123"),
method='POST')
response = client.post(url_for('service.renew_token', service_id=123),
response = client.post(url_for('service.renew_api_key', service_id=123),
headers=[('Content-Type', 'application/json'), auth_header])
assert response.status_code == 404
def test_revoke_token_should_expire_token_for_service(notify_api, notify_db, notify_db_session,
sample_token, sample_admin_service_id):
def test_revoke_api_key_should_expire_api_key_for_service(notify_api, notify_db, notify_db_session,
sample_api_key, sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert Token.query.count() == 2
assert ApiKey.query.count() == 2
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('service.revoke_token',
service_id=sample_token.service_id),
path=url_for('service.revoke_api_key',
service_id=sample_api_key.service_id),
method='POST')
response = client.post(url_for('service.revoke_token', service_id=sample_token.service_id),
response = client.post(url_for('service.revoke_api_key', service_id=sample_api_key.service_id),
headers=[auth_header])
assert response.status_code == 202
tokens_for_service = Token.query.filter_by(service_id=sample_token.service_id).first()
assert tokens_for_service.expiry_date is not None
api_keys_for_service = ApiKey.query.filter_by(service_id=sample_api_key.service_id).first()
assert api_keys_for_service.expiry_date is not None
def test_create_service_should_create_new_token_for_service(notify_api, notify_db, notify_db_session, sample_user,
sample_admin_service_id):
def test_create_service_should_create_new_service_for_user(notify_api, notify_db, notify_db_session, sample_user,
sample_admin_service_id):
with notify_api.test_request_context():
with notify_api.test_client() as client:
data = {
@@ -394,12 +401,10 @@ def test_create_service_should_create_new_token_for_service(notify_api, notify_d
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
assert Token.query.count() == 1
resp = client.post(url_for('service.create_service'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 201
assert Token.query.count() == 2
def test_create_template(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):

View File

@@ -19,7 +19,17 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
assert response.status_code == 200
json_resp = json.loads(response.get_data(as_text=True))
assert len(json_resp['data']) == 2
assert {"email_address": sample_user.email_address, "id": sample_user.id} in json_resp['data']
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"id": sample_user.id,
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
assert expected in json_resp['data']
def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
@@ -36,7 +46,17 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
headers=[header])
assert resp.status_code == 200
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data'] == {"email_address": sample_user.email_address, "id": sample_user.id}
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"id": sample_user.id,
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
assert json_resp['data'] == expected
def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_service_id):
@@ -46,8 +66,16 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
data = {'email_address': 'user@digital.cabinet-office.gov.uk'}
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
"password": "password",
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
method='POST',
@@ -73,7 +101,14 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
with notify_api.test_client() as client:
assert User.query.count() == 1
data = {
'blah': 'blah.blah'}
"name": "Test User",
"password": "password",
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
method='POST',
@@ -89,6 +124,37 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_session, sample_admin_service_id):
"""
Tests POST endpoint '/' missing attribute password.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 1
data = {
"name": "Test User",
"email_address": "user@digital.cabinet-office.gov.uk",
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.create_user'),
method='POST',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.post(
url_for('user.create_user'),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 1
json_resp = json.loads(resp.get_data(as_text=True))
assert {'password': ['Missing data for required field.']} == json_resp['message']
def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
"""
Tests PUT endpoint '/' to update a user.
@@ -98,7 +164,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
assert User.query.count() == 2
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'email_address': new_email}
'email_address': new_email
}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.update_user', user_id=sample_user.id),
method='PUT',
@@ -112,9 +179,18 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
assert User.query.count() == 2
user = User.query.filter_by(email_address=new_email).first()
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['data'] == {'email_address': new_email, 'id': user.id}
expected = {
"name": "Test User",
"email_address": new_email,
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"id": user.id,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
assert json_resp['data'] == expected
assert json_resp['data']['email_address'] == new_email
assert json_resp['data']['id'] == user.id
def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
@@ -144,33 +220,6 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
assert user.email_address != new_email
def test_put_user_missing_email(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
"""
Tests PUT endpoint '/' missing attribute email.
"""
with notify_api.test_request_context():
with notify_api.test_client() as client:
assert User.query.count() == 2
new_email = 'new@digital.cabinet-office.gov.uk'
data = {
'blah': new_email}
auth_header = create_authorization_header(service_id=sample_admin_service_id,
path=url_for('user.update_user', user_id=sample_user.id),
method='PUT',
request_body=json.dumps(data))
headers = [('Content-Type', 'application/json'), auth_header]
resp = client.put(
url_for('user.update_user', user_id=sample_user.id),
data=json.dumps(data),
headers=headers)
assert resp.status_code == 400
assert User.query.count() == 2
user = User.query.get(sample_user.id)
json_resp = json.loads(resp.get_data(as_text=True))
assert user.email_address == sample_user.email_address
assert {'email_address': ['Missing data for required field.']} == json_resp['message']
def test_get_user_services(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):
"""
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve services for a user.
@@ -280,7 +329,18 @@ def test_delete_user(notify_api, notify_db, notify_db_session, sample_user, samp
assert resp.status_code == 202
json_resp = json.loads(resp.get_data(as_text=True))
assert User.query.count() == 1
assert json_resp['data'] == {'id': sample_user.id, 'email_address': sample_user.email_address}
expected = {
"name": "Test User",
"email_address": sample_user.email_address,
"mobile_number": "+44 7700 900986",
"password_changed_at": None,
"id": sample_user.id,
"logged_in_at": None,
"state": "active",
"failed_login_count": 0
}
assert json_resp['data'] == expected
def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):