mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-14 01:02:09 -05:00
Merge branch 'master' into email-templates
Conflicts: app/user/rest.py tests/app/user/test_rest.py
This commit is contained in:
@@ -2,6 +2,8 @@ from flask import (
|
||||
jsonify,
|
||||
current_app
|
||||
)
|
||||
from sqlalchemy.exc import SQLAlchemyError, DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
|
||||
def register_errors(blueprint):
|
||||
@@ -32,3 +34,18 @@ def register_errors(blueprint):
|
||||
def internal_server_error(e):
|
||||
current_app.logger.exception(e)
|
||||
return jsonify(error="Internal error"), 500
|
||||
|
||||
@blueprint.app_errorhandler(NoResultFound)
|
||||
def no_result_found(e):
|
||||
current_app.logger.error(e)
|
||||
return jsonify(error="No result found"), 404
|
||||
|
||||
@blueprint.app_errorhandler(DataError)
|
||||
def no_result_found(e):
|
||||
current_app.logger.error(e)
|
||||
return jsonify(error="No result found"), 404
|
||||
|
||||
@blueprint.app_errorhandler(SQLAlchemyError)
|
||||
def db_error(e):
|
||||
current_app.logger.error(e)
|
||||
return jsonify(error="Database error occurred"), 500
|
||||
|
||||
@@ -3,7 +3,7 @@ import uuid
|
||||
from sqlalchemy import UniqueConstraint, Sequence
|
||||
from . import db
|
||||
import datetime
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.dialects.postgresql import (UUID, ARRAY)
|
||||
from app.encryption import (
|
||||
hashpw,
|
||||
check_hash
|
||||
@@ -40,6 +40,7 @@ class User(db.Model):
|
||||
logged_in_at = db.Column(db.DateTime, nullable=True)
|
||||
failed_login_count = db.Column(db.Integer, nullable=False, default=0)
|
||||
state = db.Column(db.String, nullable=False, default='pending')
|
||||
permissions = db.Column("permissions", ARRAY(db.String))
|
||||
|
||||
@property
|
||||
def password(self):
|
||||
|
||||
@@ -62,7 +62,8 @@ class JobSchema(BaseSchema):
|
||||
model = models.Job
|
||||
|
||||
|
||||
class RequestVerifyCodeSchema(ma.Schema):
|
||||
# TODO: Remove this schema once the admin app has stopped using the /user/<user_id>code endpoint
|
||||
class OldRequestVerifyCodeSchema(ma.Schema):
|
||||
|
||||
code_type = fields.Str(required=True)
|
||||
to = fields.Str(required=False)
|
||||
@@ -73,6 +74,10 @@ class RequestVerifyCodeSchema(ma.Schema):
|
||||
raise ValidationError('Invalid code type')
|
||||
|
||||
|
||||
class RequestVerifyCodeSchema(ma.Schema):
|
||||
to = fields.Str(required=False)
|
||||
|
||||
|
||||
# TODO main purpose to be added later
|
||||
# when processing templates, template will be
|
||||
# common for all notifications.
|
||||
@@ -154,6 +159,8 @@ api_keys_schema = ApiKeySchema(many=True)
|
||||
job_schema = JobSchema()
|
||||
job_schema_load_json = JobSchema(load_json=True)
|
||||
jobs_schema = JobSchema(many=True)
|
||||
# TODO: Remove this schema once the admin app has stopped using the /user/<user_id>code endpoint
|
||||
old_request_verify_code_schema = OldRequestVerifyCodeSchema()
|
||||
request_verify_code_schema = RequestVerifyCodeSchema()
|
||||
sms_admin_notification_schema = SmsAdminNotificationSchema()
|
||||
sms_template_notification_schema = SmsTemplateNotificationSchema()
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
from datetime import datetime
|
||||
from flask import (jsonify, request, abort, Blueprint, current_app)
|
||||
from sqlalchemy.exc import DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from app import encryption
|
||||
|
||||
from app.dao.users_dao import (
|
||||
@@ -14,11 +12,13 @@ from app.dao.users_dao import (
|
||||
reset_failed_login_count
|
||||
)
|
||||
from app.schemas import (
|
||||
old_request_verify_code_schema,
|
||||
user_schema,
|
||||
users_schema,
|
||||
request_verify_code_schema,
|
||||
user_schema_load_json
|
||||
)
|
||||
|
||||
from app.celery.tasks import (send_sms_code, send_email_code)
|
||||
from app.errors import register_errors
|
||||
|
||||
@@ -45,6 +45,7 @@ def update_user(user_id):
|
||||
user = get_model_users(user_id=user_id)
|
||||
if not user:
|
||||
return jsonify(result="error", message="User not found"), 404
|
||||
|
||||
req_json = request.get_json()
|
||||
update_dct, errors = user_schema_load_json.load(req_json)
|
||||
pwd = req_json.get('password', None)
|
||||
@@ -55,18 +56,14 @@ def update_user(user_id):
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
status_code = 200
|
||||
save_model_user(user, update_dict=update_dct, pwd=pwd)
|
||||
return jsonify(data=user_schema.dump(user).data), status_code
|
||||
save_model_user(user_to_update, update_dict=update_dct, pwd=pwd)
|
||||
return jsonify(data=user_schema.dump(user_to_update).data), status_code
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/verify/password', methods=['POST'])
|
||||
def verify_user_password(user_id):
|
||||
try:
|
||||
user = get_model_users(user_id=user_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid user id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="User not found"), 404
|
||||
user_to_verify = get_model_users(user_id=user_id)
|
||||
|
||||
txt_pwd = None
|
||||
try:
|
||||
txt_pwd = request.get_json()['password']
|
||||
@@ -74,22 +71,18 @@ def verify_user_password(user_id):
|
||||
return jsonify(
|
||||
result="error",
|
||||
message={'password': ['Required field missing data']}), 400
|
||||
if user.check_password(txt_pwd):
|
||||
reset_failed_login_count(user)
|
||||
if user_to_verify.check_password(txt_pwd):
|
||||
reset_failed_login_count(user_to_verify)
|
||||
return jsonify({}), 204
|
||||
else:
|
||||
increment_failed_login_count(user)
|
||||
increment_failed_login_count(user_to_verify)
|
||||
return jsonify(result='error', message={'password': ['Incorrect password']}), 400
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/verify/code', methods=['POST'])
|
||||
def verify_user_code(user_id):
|
||||
try:
|
||||
user = get_model_users(user_id=user_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid user id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="User not found"), 404
|
||||
user_to_verify = get_model_users(user_id=user_id)
|
||||
|
||||
txt_code = None
|
||||
resp_json = request.get_json()
|
||||
txt_type = None
|
||||
@@ -104,7 +97,7 @@ def verify_user_code(user_id):
|
||||
errors.update({'code_type': ['Required field missing data']})
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
code = get_user_code(user, txt_code, txt_type)
|
||||
code = get_user_code(user_to_verify, txt_code, txt_type)
|
||||
if not code:
|
||||
return jsonify(result="error", message="Code not found"), 404
|
||||
if datetime.now() > code.expiry_datetime or code.code_used:
|
||||
@@ -113,14 +106,9 @@ def verify_user_code(user_id):
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/code', methods=['POST'])
|
||||
def send_user_code(user_id):
|
||||
try:
|
||||
user = get_model_users(user_id=user_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid user id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="User not found"), 404
|
||||
@user.route('/<int:user_id>/sms-code', methods=['POST'])
|
||||
def send_user_sms_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
|
||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
@@ -128,13 +116,54 @@ def send_user_code(user_id):
|
||||
|
||||
from app.dao.users_dao import create_secret_code
|
||||
secret_code = create_secret_code()
|
||||
create_user_code(user, secret_code, verify_code.get('code_type'))
|
||||
create_user_code(user_to_send_to, secret_code, 'sms')
|
||||
|
||||
mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
verification_message = {'to': mobile, 'secret_code': secret_code}
|
||||
|
||||
send_sms_code.apply_async([encryption.encrypt(verification_message)], queue='sms-code')
|
||||
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user.route('/<int:user_id>/email-code', methods=['POST'])
|
||||
def send_user_email_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
print(user_to_send_to)
|
||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
from app.dao.users_dao import create_secret_code
|
||||
secret_code = create_secret_code()
|
||||
create_user_code(user_to_send_to, secret_code, 'email')
|
||||
|
||||
email = user_to_send_to.email_address if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
verification_message = {'to': email, 'secret_code': secret_code}
|
||||
|
||||
send_email_code.apply_async([encryption.encrypt(verification_message)], queue='email-code')
|
||||
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
# TODO: Remove this method once the admin app has stopped using it.
|
||||
@user.route('/<int:user_id>/code', methods=['POST'])
|
||||
def send_user_code(user_id):
|
||||
user_to_send_to = get_model_users(user_id=user_id)
|
||||
|
||||
verify_code, errors = old_request_verify_code_schema.load(request.get_json())
|
||||
if errors:
|
||||
return jsonify(result="error", message=errors), 400
|
||||
|
||||
from app.dao.users_dao import create_secret_code
|
||||
secret_code = create_secret_code()
|
||||
create_user_code(user_to_send_to, secret_code, verify_code.get('code_type'))
|
||||
if verify_code.get('code_type') == 'sms':
|
||||
mobile = user.mobile_number if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
verification_message = {'to': mobile, 'secret_code': secret_code}
|
||||
send_sms_code.apply_async([encryption.encrypt(verification_message)], queue='sms-code')
|
||||
elif verify_code.get('code_type') == 'email':
|
||||
email = user.email_address if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
email = user_to_send_to.email_address if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
verification_message = {
|
||||
'to_address': email,
|
||||
'from_address': current_app.config['VERIFY_CODE_FROM_EMAIL_ADDRESS'],
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#!/bin/bash
|
||||
export NOTIFY_API_ENVIRONMENT='config.Test'
|
||||
export ADMIN_CLIENT_USER_NAME='dev-notify-admin'
|
||||
export ADMIN_CLIENT_SECRET='dev-notify-secret-key'
|
||||
|
||||
26
migrations/versions/0015_add_permissions.py
Normal file
26
migrations/versions/0015_add_permissions.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0015_add_permissions
|
||||
Revises: 0014_job_id_nullable
|
||||
Create Date: 2016-02-19 14:16:59.359493
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0015_add_permissions'
|
||||
down_revision = '0014_job_id_nullable'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column('users', sa.Column('permissions', postgresql.ARRAY(sa.String()), nullable=True))
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column('users', 'permissions')
|
||||
### end Alembic commands ###
|
||||
@@ -1,14 +1,14 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 0015_add_subject_line
|
||||
Revises: 0014_job_id_nullable
|
||||
Revises: 0015_add_permissions
|
||||
Create Date: 2016-02-18 09:43:29.282804
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '0015_add_subject_line'
|
||||
down_revision = '0014_job_id_nullable'
|
||||
down_revision = '0015_add_permissions'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
@@ -32,7 +32,8 @@ def sample_user(notify_db,
|
||||
'email_address': email,
|
||||
'password': 'password',
|
||||
'mobile_number': '+447700900986',
|
||||
'state': 'active'
|
||||
'state': 'active',
|
||||
'permissions': []
|
||||
}
|
||||
usr = User.query.filter_by(email_address=email).first()
|
||||
if not usr:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
from flask import url_for
|
||||
from app.models import (User, Service)
|
||||
from app.dao.users_dao import save_model_user
|
||||
from tests import create_authorization_header
|
||||
from tests.app.conftest import sample_service as create_sample_service
|
||||
|
||||
@@ -27,7 +28,8 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
assert expected in json_resp['data']
|
||||
|
||||
@@ -54,7 +56,8 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
@@ -74,7 +77,8 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -107,7 +111,8 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -138,7 +143,8 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
@@ -166,7 +172,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': new_email,
|
||||
'mobile_number': sample_user.mobile_number
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': []
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
@@ -189,7 +196,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
assert json_resp['data']['email_address'] == new_email
|
||||
@@ -211,7 +219,8 @@ def test_put_user_update_password(notify_api,
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'password': new_password
|
||||
'password': new_password,
|
||||
'permissions': []
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
@@ -248,22 +257,280 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us
|
||||
assert User.query.count() == 2
|
||||
new_email = 'new@digital.cabinet-office.gov.uk'
|
||||
data = {'email_address': new_email}
|
||||
auth_header = create_authorization_header(
|
||||
service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id="9999"),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id="9999"),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id="9999"),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
print(json_resp)
|
||||
assert resp.status_code == 404
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(id=sample_user.id).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'result': 'error', 'message': 'User not found'}
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
assert user == sample_user
|
||||
assert user.email_address != new_email
|
||||
|
||||
|
||||
def test_get_user_services(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve services for a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.get(sample_service.users[0].id)
|
||||
another_name = "another name"
|
||||
create_sample_service(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service_name=another_name,
|
||||
user=user)
|
||||
assert Service.query.count() == 3
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id=user.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert len(json_resp['data']) == 2
|
||||
|
||||
|
||||
def test_get_user_service(notify_api, notify_db, notify_db_session, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" to retrieve a service for a user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.first()
|
||||
another_name = "another name"
|
||||
another_service = create_sample_service(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
service_name=another_name,
|
||||
user=user)
|
||||
assert Service.query.count() == 3
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id=user.id,
|
||||
service_id=another_service.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id, service_id=another_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 200
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp['data']['name'] == another_name
|
||||
assert json_resp['data']['id'] == str(another_service.id)
|
||||
|
||||
|
||||
def test_get_user_service_user_not_exists(notify_api, sample_service, sample_admin_service_id):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert Service.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.get_service_by_user_id', user_id="123423",
|
||||
service_id=sample_service.id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id="123423", service_id=sample_service.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_get_user_service_service_not_exists(notify_api, sample_service):
|
||||
"""
|
||||
Tests GET endpoint "/<user_id>/service/<service_id>" 404 is returned for invalid service.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
user = User.query.first()
|
||||
assert Service.query.count() == 1
|
||||
import uuid
|
||||
missing_service_id = uuid.uuid4()
|
||||
auth_header = create_authorization_header(path=url_for('user.get_service_by_user_id', user_id=user.id,
|
||||
service_id=missing_service_id),
|
||||
method='GET')
|
||||
resp = client.get(
|
||||
url_for('user.get_service_by_user_id', user_id=user.id, service_id=missing_service_id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_delete_user(notify_api, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests DELETE endpoint '/<user_id>' delete user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 202
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert User.query.count() == 1
|
||||
expected = {
|
||||
"name": "Test User",
|
||||
"email_address": sample_user.email_address,
|
||||
"mobile_number": "+447700900986",
|
||||
"password_changed_at": None,
|
||||
"id": sample_user.id,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": []
|
||||
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
|
||||
def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests DELETE endpoint '/<user_id>' delete user.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 2
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id='99999'),
|
||||
method='DELETE')
|
||||
resp = client.delete(
|
||||
url_for('user.update_user', user_id="99999"),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert User.query.count() == 2
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
assert json_resp == {'error': 'No result found'}
|
||||
|
||||
|
||||
def test_post_with_permissions(notify_api, notify_db, notify_db_session, sample_admin_service_id):
|
||||
"""
|
||||
Tests POST endpoint '/' to create a user with permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
assert User.query.count() == 1
|
||||
permissions = ['new permission']
|
||||
data = {
|
||||
"name": "Test User",
|
||||
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||
"password": "password",
|
||||
"mobile_number": "+447700900986",
|
||||
"password_changed_at": None,
|
||||
"logged_in_at": None,
|
||||
"state": "active",
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.create_user'),
|
||||
method='POST',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.post(
|
||||
url_for('user.create_user'),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 201
|
||||
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
json_resp['data'] == {"email_address": user.email_address, "id": user.id}
|
||||
assert json_resp['data']['email_address'] == user.email_address
|
||||
assert json_resp['data']['id'] == user.id
|
||||
assert json_resp['data']['permissions'] == permissions
|
||||
|
||||
|
||||
def test_put_add_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update user permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
permissions = ['one permission', 'another permission']
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected = {
|
||||
"name": user.name,
|
||||
"email_address": user.email_address,
|
||||
"mobile_number": user.mobile_number,
|
||||
"password_changed_at": None,
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": user.state,
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
|
||||
def test_put_remove_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||
"""
|
||||
Tests PUT endpoint '/' to update user permissions.
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
old_permissions = ['one permission', 'another permission']
|
||||
save_model_user(sample_user, {'permissions': old_permissions})
|
||||
permissions = ['new permissions']
|
||||
data = {
|
||||
'name': sample_user.name,
|
||||
'email_address': sample_user.email_address,
|
||||
'mobile_number': sample_user.mobile_number,
|
||||
'permissions': permissions
|
||||
}
|
||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||
path=url_for('user.update_user', user_id=sample_user.id),
|
||||
method='PUT',
|
||||
request_body=json.dumps(data))
|
||||
headers = [('Content-Type', 'application/json'), auth_header]
|
||||
resp = client.put(
|
||||
url_for('user.update_user', user_id=sample_user.id),
|
||||
data=json.dumps(data),
|
||||
headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert User.query.count() == 2
|
||||
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||
json_resp = json.loads(resp.get_data(as_text=True))
|
||||
expected = {
|
||||
"name": user.name,
|
||||
"email_address": user.email_address,
|
||||
"mobile_number": user.mobile_number,
|
||||
"password_changed_at": None,
|
||||
"id": user.id,
|
||||
"logged_in_at": None,
|
||||
"state": user.state,
|
||||
"failed_login_count": 0,
|
||||
"permissions": permissions
|
||||
}
|
||||
assert json_resp['data'] == expected
|
||||
|
||||
@@ -2,9 +2,7 @@ import json
|
||||
import moto
|
||||
from datetime import (datetime, timedelta)
|
||||
from flask import url_for
|
||||
|
||||
from app.models import (VerifyCode)
|
||||
|
||||
import app.celery.tasks
|
||||
from app import db, encryption
|
||||
from tests import create_authorization_header
|
||||
@@ -191,7 +189,6 @@ def test_user_verify_password_valid_password_resets_failed_logins(notify_api,
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
sample_user):
|
||||
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({'password': 'bad password'})
|
||||
@@ -249,9 +246,10 @@ def test_user_verify_password_missing_password(notify_api,
|
||||
assert 'Required field missing data' in json_resp['message']['password']
|
||||
|
||||
|
||||
# TODO: Remove this test once the admin app has stopped using it.
|
||||
def test_send_user_code_for_sms(notify_api,
|
||||
sample_sms_code,
|
||||
mock_secret_code,
|
||||
mock_encryption,
|
||||
mock_celery_send_sms_code):
|
||||
"""
|
||||
Tests POST endpoint '/<user_id>/code' successful sms
|
||||
@@ -269,35 +267,11 @@ def test_send_user_code_for_sms(notify_api,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 204
|
||||
encrpyted = encryption.encrypt({'to': sample_sms_code.user.mobile_number, 'secret_code': '11111'})
|
||||
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrpyted], queue='sms-code')
|
||||
|
||||
|
||||
def test_send_user_code_for_sms_with_optional_to_field(notify_api,
|
||||
sample_sms_code,
|
||||
mock_secret_code,
|
||||
mock_celery_send_sms_code):
|
||||
"""
|
||||
Tests POST endpoint '/<user_id>/code' successful sms with optional to field
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
|
||||
data = json.dumps({'code_type': 'sms', 'to': '+441119876757'})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_code', user_id=sample_sms_code.user.id),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_code', user_id=sample_sms_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 204
|
||||
encrypted = encryption.encrypt({'to': '+441119876757', 'secret_code': '11111'})
|
||||
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrypted], queue='sms-code')
|
||||
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with(['something_encrypted'],
|
||||
queue='sms-code')
|
||||
|
||||
|
||||
# TODO: Remove this test once the admin app has stopped using it.
|
||||
def test_send_user_code_for_email(notify_api,
|
||||
sample_email_code,
|
||||
mock_secret_code,
|
||||
@@ -323,6 +297,7 @@ def test_send_user_code_for_email(notify_api,
|
||||
queue='email-code')
|
||||
|
||||
|
||||
# TODO: Remove this test once the admin app has stopped using it.
|
||||
def test_send_user_code_for_email_uses_optional_to_field(notify_api,
|
||||
sample_email_code,
|
||||
mock_secret_code,
|
||||
@@ -348,16 +323,130 @@ def test_send_user_code_for_email_uses_optional_to_field(notify_api,
|
||||
queue='email-code')
|
||||
|
||||
|
||||
def test_request_verify_code_schema_invalid_code_type(notify_api, notify_db, notify_db_session, sample_user):
|
||||
from app.schemas import request_verify_code_schema
|
||||
# TODO: Remove this test once the admin app has stopped using it.
|
||||
def test_request_verify_code_schema_invalid_code_type(notify_api, sample_user):
|
||||
from app.schemas import old_request_verify_code_schema
|
||||
data = json.dumps({'code_type': 'not_sms'})
|
||||
code, error = request_verify_code_schema.loads(data)
|
||||
code, error = old_request_verify_code_schema.loads(data)
|
||||
assert error == {'code_type': ['Invalid code type']}
|
||||
|
||||
|
||||
def test_request_verify_code_schema_with_to(notify_api, notify_db, notify_db_session, sample_user):
|
||||
from app.schemas import request_verify_code_schema
|
||||
# TODO: Remove this method once the admin app has stopped using it.
|
||||
def test_request_verify_code_schema_with_to(notify_api, sample_user):
|
||||
from app.schemas import old_request_verify_code_schema
|
||||
data = json.dumps({'code_type': 'sms', 'to': 'some@one.gov.uk'})
|
||||
code, error = request_verify_code_schema.loads(data)
|
||||
code, error = old_request_verify_code_schema.loads(data)
|
||||
assert code == {'code_type': 'sms', 'to': 'some@one.gov.uk'}
|
||||
assert error == {}
|
||||
|
||||
|
||||
def test_send_user_sms_code(notify_api,
|
||||
sample_sms_code,
|
||||
mock_celery_send_sms_code,
|
||||
mock_encryption):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/sms-code
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
print(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 204
|
||||
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with(['something_encrypted'],
|
||||
queue='sms-code')
|
||||
|
||||
|
||||
def test_send_user_code_for_sms_with_optional_to_field(notify_api,
|
||||
sample_sms_code,
|
||||
mock_secret_code,
|
||||
mock_celery_send_sms_code):
|
||||
"""
|
||||
Tests POST endpoint '/<user_id>/code' successful sms with optional to field
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({'to': '+441119876757'})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
assert resp.status_code == 204
|
||||
encrypted = encryption.encrypt({'to': '+441119876757', 'secret_code': '11111'})
|
||||
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrypted], queue='sms-code')
|
||||
|
||||
|
||||
def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({})
|
||||
import uuid
|
||||
uuid_ = uuid.uuid4()
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_sms_code', user_id=uuid_),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=uuid_),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['error'] == 'No result found'
|
||||
|
||||
|
||||
def test_send_user_email_code(notify_api,
|
||||
sample_email_code,
|
||||
mock_celery_send_email_code,
|
||||
mock_encryption):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/email-code
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_email_code', user_id=sample_email_code.user.id),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_email_code', user_id=sample_email_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
print(resp.get_data(as_text=True))
|
||||
assert resp.status_code == 204
|
||||
app.celery.tasks.send_email_code.apply_async.assert_called_once_with(['something_encrypted'],
|
||||
queue='email-code')
|
||||
|
||||
|
||||
def test_send_user_email_code_returns_404_for_when_user_does_not_exist(notify_api, notify_db, notify_db_session):
|
||||
"""
|
||||
Tests POST endpoint /user/<user_id>/email-code return 404 for missing user
|
||||
"""
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
data = json.dumps({})
|
||||
auth_header = create_authorization_header(
|
||||
path=url_for('user.send_user_email_code', user_id=1),
|
||||
method='POST',
|
||||
request_body=data)
|
||||
resp = client.post(
|
||||
url_for('user.send_user_email_code', user_id=1),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
assert json.loads(resp.get_data(as_text=True))['error'] == 'No result found'
|
||||
|
||||
Reference in New Issue
Block a user