diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 7d094c4eb..cdf7ca781 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -46,9 +46,9 @@ def send_sms_code(encrypted_verification): def send_email_code(encrypted_verification_message): verification_message = encryption.decrypt(encrypted_verification_message) try: - aws_ses_client.send_email(verification_message['from_address'], - verification_message['to_address'], - verification_message['subject'], - verification_message['body']) + aws_ses_client.send_email(current_app.config['VERIFY_CODE_FROM_EMAIL_ADDRESS'], + verification_message['to'], + "Verification code", + verification_message['secret_code']) except AwsSesClientException as e: current_app.logger.error(e) diff --git a/app/errors.py b/app/errors.py index 3deac5600..da3b719ca 100644 --- a/app/errors.py +++ b/app/errors.py @@ -2,6 +2,8 @@ from flask import ( jsonify, current_app ) +from sqlalchemy.exc import SQLAlchemyError, DataError +from sqlalchemy.orm.exc import NoResultFound def register_errors(blueprint): @@ -32,3 +34,18 @@ def register_errors(blueprint): def internal_server_error(e): current_app.logger.exception(e) return jsonify(error="Internal error"), 500 + + @blueprint.app_errorhandler(NoResultFound) + def no_result_found(e): + current_app.logger.error(e) + return jsonify(error="No result found"), 404 + + @blueprint.app_errorhandler(DataError) + def no_result_found(e): + current_app.logger.error(e) + return jsonify(error="No result found"), 404 + + @blueprint.app_errorhandler(SQLAlchemyError) + def db_error(e): + current_app.logger.error(e) + return jsonify(error="Database error occurred"), 500 diff --git a/app/models.py b/app/models.py index b37ce4181..ce810287a 100644 --- a/app/models.py +++ b/app/models.py @@ -3,7 +3,7 @@ import uuid from sqlalchemy import UniqueConstraint, Sequence from . import db import datetime -from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import (UUID, ARRAY) from app.encryption import ( hashpw, check_hash @@ -40,6 +40,7 @@ class User(db.Model): logged_in_at = db.Column(db.DateTime, nullable=True) failed_login_count = db.Column(db.Integer, nullable=False, default=0) state = db.Column(db.String, nullable=False, default='pending') + permissions = db.Column("permissions", ARRAY(db.String)) @property def password(self): diff --git a/app/schemas.py b/app/schemas.py index 18007f91d..0d8724e00 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -62,7 +62,8 @@ class JobSchema(BaseSchema): model = models.Job -class RequestVerifyCodeSchema(ma.Schema): +# TODO: Remove this schema once the admin app has stopped using the /user/code endpoint +class OldRequestVerifyCodeSchema(ma.Schema): code_type = fields.Str(required=True) to = fields.Str(required=False) @@ -73,6 +74,10 @@ class RequestVerifyCodeSchema(ma.Schema): raise ValidationError('Invalid code type') +class RequestVerifyCodeSchema(ma.Schema): + to = fields.Str(required=False) + + # TODO main purpose to be added later # when processing templates, template will be # common for all notifications. @@ -154,6 +159,8 @@ api_keys_schema = ApiKeySchema(many=True) job_schema = JobSchema() job_schema_load_json = JobSchema(load_json=True) jobs_schema = JobSchema(many=True) +# TODO: Remove this schema once the admin app has stopped using the /user/code endpoint +old_request_verify_code_schema = OldRequestVerifyCodeSchema() request_verify_code_schema = RequestVerifyCodeSchema() sms_admin_notification_schema = SmsAdminNotificationSchema() sms_template_notification_schema = SmsTemplateNotificationSchema() diff --git a/app/user/rest.py b/app/user/rest.py index bc8e6a7a6..fbee92598 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -1,7 +1,5 @@ from datetime import datetime from flask import (jsonify, request, abort, Blueprint, current_app) -from sqlalchemy.exc import DataError -from sqlalchemy.orm.exc import NoResultFound from app import encryption from app.dao.users_dao import ( @@ -14,11 +12,13 @@ from app.dao.users_dao import ( reset_failed_login_count ) from app.schemas import ( + old_request_verify_code_schema, user_schema, users_schema, request_verify_code_schema, user_schema_load_json ) + from app.celery.tasks import (send_sms_code, send_email_code) from app.errors import register_errors @@ -42,9 +42,10 @@ def create_user(): @user.route('/', methods=['PUT']) def update_user(user_id): - user = get_model_users(user_id=user_id) - if not user: + user_to_update = get_model_users(user_id=user_id) + if not user_to_update: return jsonify(result="error", message="User not found"), 404 + req_json = request.get_json() update_dct, errors = user_schema_load_json.load(req_json) pwd = req_json.get('password', None) @@ -55,18 +56,14 @@ def update_user(user_id): if errors: return jsonify(result="error", message=errors), 400 status_code = 200 - save_model_user(user, update_dict=update_dct, pwd=pwd) - return jsonify(data=user_schema.dump(user).data), status_code + save_model_user(user_to_update, update_dict=update_dct, pwd=pwd) + return jsonify(data=user_schema.dump(user_to_update).data), status_code @user.route('//verify/password', methods=['POST']) def verify_user_password(user_id): - try: - user = get_model_users(user_id=user_id) - except DataError: - return jsonify(result="error", message="Invalid user id"), 400 - except NoResultFound: - return jsonify(result="error", message="User not found"), 404 + user_to_verify = get_model_users(user_id=user_id) + txt_pwd = None try: txt_pwd = request.get_json()['password'] @@ -74,22 +71,18 @@ def verify_user_password(user_id): return jsonify( result="error", message={'password': ['Required field missing data']}), 400 - if user.check_password(txt_pwd): - reset_failed_login_count(user) + if user_to_verify.check_password(txt_pwd): + reset_failed_login_count(user_to_verify) return jsonify({}), 204 else: - increment_failed_login_count(user) + increment_failed_login_count(user_to_verify) return jsonify(result='error', message={'password': ['Incorrect password']}), 400 @user.route('//verify/code', methods=['POST']) def verify_user_code(user_id): - try: - user = get_model_users(user_id=user_id) - except DataError: - return jsonify(result="error", message="Invalid user id"), 400 - except NoResultFound: - return jsonify(result="error", message="User not found"), 404 + user_to_verify = get_model_users(user_id=user_id) + txt_code = None resp_json = request.get_json() txt_type = None @@ -104,7 +97,7 @@ def verify_user_code(user_id): errors.update({'code_type': ['Required field missing data']}) if errors: return jsonify(result="error", message=errors), 400 - code = get_user_code(user, txt_code, txt_type) + code = get_user_code(user_to_verify, txt_code, txt_type) if not code: return jsonify(result="error", message="Code not found"), 404 if datetime.now() > code.expiry_datetime or code.code_used: @@ -113,14 +106,12 @@ def verify_user_code(user_id): return jsonify({}), 204 -@user.route('//code', methods=['POST']) -def send_user_code(user_id): - try: - user = get_model_users(user_id=user_id) - except DataError: - return jsonify(result="error", message="Invalid user id"), 400 - except NoResultFound: - return jsonify(result="error", message="User not found"), 404 +@user.route('//sms-code', methods=['POST']) +def send_user_sms_code(user_id): + user_to_send_to = get_model_users(user_id=user_id) + + if not user_to_send_to: + return jsonify(result="error", message="No user found"), 404 verify_code, errors = request_verify_code_schema.load(request.get_json()) if errors: @@ -128,13 +119,59 @@ def send_user_code(user_id): from app.dao.users_dao import create_secret_code secret_code = create_secret_code() - create_user_code(user, secret_code, verify_code.get('code_type')) + create_user_code(user_to_send_to, secret_code, 'sms') + + mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to') + verification_message = {'to': mobile, 'secret_code': secret_code} + + send_sms_code.apply_async([encryption.encrypt(verification_message)], queue='sms-code') + + return jsonify({}), 204 + + +@user.route('//email-code', methods=['POST']) +def send_user_email_code(user_id): + user_to_send_to = get_model_users(user_id=user_id) + if not user_to_send_to: + return jsonify(result="error", message="No user found"), 404 + + verify_code, errors = request_verify_code_schema.load(request.get_json()) + if errors: + return jsonify(result="error", message=errors), 400 + + from app.dao.users_dao import create_secret_code + secret_code = create_secret_code() + create_user_code(user_to_send_to, secret_code, 'email') + + email = user_to_send_to.email_address if verify_code.get('to', None) is None else verify_code.get('to') + verification_message = {'to': email, 'secret_code': secret_code} + + send_email_code.apply_async([encryption.encrypt(verification_message)], queue='email-code') + + return jsonify({}), 204 + + +# TODO: Remove this method once the admin app has stopped using it. +@user.route('//code', methods=['POST']) +def send_user_code(user_id): + user_to_send_to = get_model_users(user_id=user_id) + + if not user_to_send_to: + return jsonify(result="error", message="not found"), 404 + + verify_code, errors = old_request_verify_code_schema.load(request.get_json()) + if errors: + return jsonify(result="error", message=errors), 400 + + from app.dao.users_dao import create_secret_code + secret_code = create_secret_code() + create_user_code(user_to_send_to, secret_code, verify_code.get('code_type')) if verify_code.get('code_type') == 'sms': - mobile = user.mobile_number if verify_code.get('to', None) is None else verify_code.get('to') + mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to') verification_message = {'to': mobile, 'secret_code': secret_code} send_sms_code.apply_async([encryption.encrypt(verification_message)], queue='sms-code') elif verify_code.get('code_type') == 'email': - email = user.email_address if verify_code.get('to', None) is None else verify_code.get('to') + email = user_to_send_to.email_address if verify_code.get('to', None) is None else verify_code.get('to') verification_message = { 'to_address': email, 'from_address': current_app.config['VERIFY_CODE_FROM_EMAIL_ADDRESS'], diff --git a/environment_test.sh b/environment_test.sh index bbc44a855..c4ce96bcf 100644 --- a/environment_test.sh +++ b/environment_test.sh @@ -1,3 +1,4 @@ +#!/bin/bash export NOTIFY_API_ENVIRONMENT='config.Test' export ADMIN_CLIENT_USER_NAME='dev-notify-admin' export ADMIN_CLIENT_SECRET='dev-notify-secret-key' diff --git a/migrations/versions/0015_add_permissions.py b/migrations/versions/0015_add_permissions.py new file mode 100644 index 000000000..507a5effa --- /dev/null +++ b/migrations/versions/0015_add_permissions.py @@ -0,0 +1,26 @@ +"""empty message + +Revision ID: 0015_add_permissions +Revises: 0014_job_id_nullable +Create Date: 2016-02-19 14:16:59.359493 + +""" + +# revision identifiers, used by Alembic. +revision = '0015_add_permissions' +down_revision = '0014_job_id_nullable' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('users', sa.Column('permissions', postgresql.ARRAY(sa.String()), nullable=True)) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('users', 'permissions') + ### end Alembic commands ### diff --git a/migrations/versions/0015_add_subject_line.py b/migrations/versions/0015_add_subject_line.py index 32dae82a8..88102327d 100644 --- a/migrations/versions/0015_add_subject_line.py +++ b/migrations/versions/0015_add_subject_line.py @@ -1,14 +1,14 @@ """empty message Revision ID: 0015_add_subject_line -Revises: 0014_job_id_nullable +Revises: 0015_add_permissions Create Date: 2016-02-18 09:43:29.282804 """ # revision identifiers, used by Alembic. revision = '0015_add_subject_line' -down_revision = '0014_job_id_nullable' +down_revision = '0015_add_permissions' from alembic import op import sqlalchemy as sa diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index d303e338e..37817c162 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,5 +1,6 @@ import uuid import pytest +from flask import current_app from app.celery.tasks import (send_sms, send_sms_code, send_email_code) from app import (firetext_client, aws_ses_client, encryption) from app.clients.sms.firetext import FiretextClientException @@ -98,17 +99,15 @@ def test_should_throw_firetext_client_exception(mocker): def test_should_send_email_code(mocker): - verification = {'to_address': 'someone@it.gov.uk', - 'from_address': 'no-reply@notify.gov.uk', - 'subject': 'Verification code', - 'body': 11111} + verification = {'to': 'someone@it.gov.uk', + 'secret_code': 11111} encrypted_verification = encryption.encrypt(verification) mocker.patch('app.aws_ses_client.send_email') send_email_code(encrypted_verification) - aws_ses_client.send_email.assert_called_once_with(verification['from_address'], - verification['to_address'], - verification['subject'], - verification['body']) + aws_ses_client.send_email.assert_called_once_with(current_app.config['VERIFY_CODE_FROM_EMAIL_ADDRESS'], + verification['to'], + "Verification code", + verification['secret_code']) diff --git a/tests/app/conftest.py b/tests/app/conftest.py index e19a384bc..3f12fc1a0 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -32,7 +32,8 @@ def sample_user(notify_db, 'email_address': email, 'password': 'password', 'mobile_number': '+447700900986', - 'state': 'active' + 'state': 'active', + 'permissions': [] } usr = User.query.filter_by(email_address=email).first() if not usr: diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index e9153d8a3..6a5f1fcde 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -1,6 +1,7 @@ import json from flask import url_for from app.models import (User, Service) +from app.dao.users_dao import save_model_user from tests import create_authorization_header from tests.app.conftest import sample_service as create_sample_service @@ -27,7 +28,8 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa "password_changed_at": None, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } assert expected in json_resp['data'] @@ -54,7 +56,8 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_ "password_changed_at": None, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } assert json_resp['data'] == expected @@ -74,7 +77,8 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic "password_changed_at": None, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } auth_header = create_authorization_header(service_id=sample_admin_service_id, path=url_for('user.create_user'), @@ -107,7 +111,8 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess "password_changed_at": None, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } auth_header = create_authorization_header(service_id=sample_admin_service_id, path=url_for('user.create_user'), @@ -138,7 +143,8 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s "password_changed_at": None, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } auth_header = create_authorization_header(service_id=sample_admin_service_id, path=url_for('user.create_user'), @@ -166,7 +172,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_ data = { 'name': sample_user.name, 'email_address': new_email, - 'mobile_number': sample_user.mobile_number + 'mobile_number': sample_user.mobile_number, + 'permissions': [] } auth_header = create_authorization_header(service_id=sample_admin_service_id, path=url_for('user.update_user', user_id=sample_user.id), @@ -189,7 +196,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_ "id": user.id, "logged_in_at": None, "state": "active", - "failed_login_count": 0 + "failed_login_count": 0, + "permissions": [] } assert json_resp['data'] == expected assert json_resp['data']['email_address'] == new_email @@ -211,7 +219,8 @@ def test_put_user_update_password(notify_api, 'name': sample_user.name, 'email_address': sample_user.email_address, 'mobile_number': sample_user.mobile_number, - 'password': new_password + 'password': new_password, + 'permissions': [] } auth_header = create_authorization_header(service_id=sample_admin_service_id, path=url_for('user.update_user', user_id=sample_user.id), @@ -248,22 +257,140 @@ def test_put_user_not_exists(notify_api, notify_db, notify_db_session, sample_us assert User.query.count() == 2 new_email = 'new@digital.cabinet-office.gov.uk' data = {'email_address': new_email} - auth_header = create_authorization_header( - service_id=sample_admin_service_id, - path=url_for('user.update_user', user_id="9999"), - method='PUT', - request_body=json.dumps(data)) + auth_header = create_authorization_header(service_id=sample_admin_service_id, + path=url_for('user.update_user', user_id="9999"), + method='PUT', + request_body=json.dumps(data)) headers = [('Content-Type', 'application/json'), auth_header] resp = client.put( url_for('user.update_user', user_id="9999"), data=json.dumps(data), headers=headers) - json_resp = json.loads(resp.get_data(as_text=True)) - print(json_resp) assert resp.status_code == 404 assert User.query.count() == 2 user = User.query.filter_by(id=sample_user.id).first() json_resp = json.loads(resp.get_data(as_text=True)) - assert json_resp == {'result': 'error', 'message': 'User not found'} + assert json_resp['result'] == "error" + assert json_resp['message'] == "User not found" + assert user == sample_user assert user.email_address != new_email + + +def test_post_with_permissions(notify_api, notify_db, notify_db_session, sample_admin_service_id): + """ + Tests POST endpoint '/' to create a user with permissions. + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + assert User.query.count() == 1 + permissions = ['new permission'] + data = { + "name": "Test User", + "email_address": "user@digital.cabinet-office.gov.uk", + "password": "password", + "mobile_number": "+447700900986", + "password_changed_at": None, + "logged_in_at": None, + "state": "active", + "failed_login_count": 0, + "permissions": permissions + } + auth_header = create_authorization_header(service_id=sample_admin_service_id, + path=url_for('user.create_user'), + method='POST', + request_body=json.dumps(data)) + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.post( + url_for('user.create_user'), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 201 + user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first() + json_resp = json.loads(resp.get_data(as_text=True)) + json_resp['data'] == {"email_address": user.email_address, "id": user.id} + assert json_resp['data']['email_address'] == user.email_address + assert json_resp['data']['id'] == user.id + assert json_resp['data']['permissions'] == permissions + + +def test_put_add_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): + """ + Tests PUT endpoint '/' to update user permissions. + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + permissions = ['one permission', 'another permission'] + data = { + 'name': sample_user.name, + 'email_address': sample_user.email_address, + 'mobile_number': sample_user.mobile_number, + 'permissions': permissions + } + auth_header = create_authorization_header(service_id=sample_admin_service_id, + path=url_for('user.update_user', user_id=sample_user.id), + method='PUT', + request_body=json.dumps(data)) + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.put( + url_for('user.update_user', user_id=sample_user.id), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 200 + assert User.query.count() == 2 + user = User.query.filter_by(email_address=sample_user.email_address).first() + json_resp = json.loads(resp.get_data(as_text=True)) + expected = { + "name": user.name, + "email_address": user.email_address, + "mobile_number": user.mobile_number, + "password_changed_at": None, + "id": user.id, + "logged_in_at": None, + "state": user.state, + "failed_login_count": 0, + "permissions": permissions + } + assert json_resp['data'] == expected + + +def test_put_remove_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id): + """ + Tests PUT endpoint '/' to update user permissions. + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + old_permissions = ['one permission', 'another permission'] + save_model_user(sample_user, {'permissions': old_permissions}) + permissions = ['new permissions'] + data = { + 'name': sample_user.name, + 'email_address': sample_user.email_address, + 'mobile_number': sample_user.mobile_number, + 'permissions': permissions + } + auth_header = create_authorization_header(service_id=sample_admin_service_id, + path=url_for('user.update_user', user_id=sample_user.id), + method='PUT', + request_body=json.dumps(data)) + headers = [('Content-Type', 'application/json'), auth_header] + resp = client.put( + url_for('user.update_user', user_id=sample_user.id), + data=json.dumps(data), + headers=headers) + assert resp.status_code == 200 + assert User.query.count() == 2 + user = User.query.filter_by(email_address=sample_user.email_address).first() + json_resp = json.loads(resp.get_data(as_text=True)) + expected = { + "name": user.name, + "email_address": user.email_address, + "mobile_number": user.mobile_number, + "password_changed_at": None, + "id": user.id, + "logged_in_at": None, + "state": user.state, + "failed_login_count": 0, + "permissions": permissions + } + assert json_resp['data'] == expected diff --git a/tests/app/user/test_rest_verify.py b/tests/app/user/test_rest_verify.py index e6759f45b..5d6dc544c 100644 --- a/tests/app/user/test_rest_verify.py +++ b/tests/app/user/test_rest_verify.py @@ -2,9 +2,7 @@ import json import moto from datetime import (datetime, timedelta) from flask import url_for - from app.models import (VerifyCode) - import app.celery.tasks from app import db, encryption from tests import create_authorization_header @@ -191,7 +189,6 @@ def test_user_verify_password_valid_password_resets_failed_logins(notify_api, notify_db, notify_db_session, sample_user): - with notify_api.test_request_context(): with notify_api.test_client() as client: data = json.dumps({'password': 'bad password'}) @@ -249,9 +246,10 @@ def test_user_verify_password_missing_password(notify_api, assert 'Required field missing data' in json_resp['message']['password'] +# TODO: Remove this test once the admin app has stopped using it. def test_send_user_code_for_sms(notify_api, sample_sms_code, - mock_secret_code, + mock_encryption, mock_celery_send_sms_code): """ Tests POST endpoint '//code' successful sms @@ -269,35 +267,11 @@ def test_send_user_code_for_sms(notify_api, headers=[('Content-Type', 'application/json'), auth_header]) assert resp.status_code == 204 - encrpyted = encryption.encrypt({'to': sample_sms_code.user.mobile_number, 'secret_code': '11111'}) - app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrpyted], queue='sms-code') - - -def test_send_user_code_for_sms_with_optional_to_field(notify_api, - sample_sms_code, - mock_secret_code, - mock_celery_send_sms_code): - """ - Tests POST endpoint '//code' successful sms with optional to field - """ - with notify_api.test_request_context(): - with notify_api.test_client() as client: - - data = json.dumps({'code_type': 'sms', 'to': '+441119876757'}) - auth_header = create_authorization_header( - path=url_for('user.send_user_code', user_id=sample_sms_code.user.id), - method='POST', - request_body=data) - resp = client.post( - url_for('user.send_user_code', user_id=sample_sms_code.user.id), - data=data, - headers=[('Content-Type', 'application/json'), auth_header]) - - assert resp.status_code == 204 - encrypted = encryption.encrypt({'to': '+441119876757', 'secret_code': '11111'}) - app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrypted], queue='sms-code') + app.celery.tasks.send_sms_code.apply_async.assert_called_once_with(['something_encrypted'], + queue='sms-code') +# TODO: Remove this test once the admin app has stopped using it. def test_send_user_code_for_email(notify_api, sample_email_code, mock_secret_code, @@ -323,6 +297,7 @@ def test_send_user_code_for_email(notify_api, queue='email-code') +# TODO: Remove this test once the admin app has stopped using it. def test_send_user_code_for_email_uses_optional_to_field(notify_api, sample_email_code, mock_secret_code, @@ -348,16 +323,130 @@ def test_send_user_code_for_email_uses_optional_to_field(notify_api, queue='email-code') -def test_request_verify_code_schema_invalid_code_type(notify_api, notify_db, notify_db_session, sample_user): - from app.schemas import request_verify_code_schema +# TODO: Remove this test once the admin app has stopped using it. +def test_request_verify_code_schema_invalid_code_type(notify_api, sample_user): + from app.schemas import old_request_verify_code_schema data = json.dumps({'code_type': 'not_sms'}) - code, error = request_verify_code_schema.loads(data) + code, error = old_request_verify_code_schema.loads(data) assert error == {'code_type': ['Invalid code type']} -def test_request_verify_code_schema_with_to(notify_api, notify_db, notify_db_session, sample_user): - from app.schemas import request_verify_code_schema +# TODO: Remove this method once the admin app has stopped using it. +def test_request_verify_code_schema_with_to(notify_api, sample_user): + from app.schemas import old_request_verify_code_schema data = json.dumps({'code_type': 'sms', 'to': 'some@one.gov.uk'}) - code, error = request_verify_code_schema.loads(data) + code, error = old_request_verify_code_schema.loads(data) assert code == {'code_type': 'sms', 'to': 'some@one.gov.uk'} assert error == {} + + +def test_send_user_sms_code(notify_api, + sample_sms_code, + mock_celery_send_sms_code, + mock_encryption): + """ + Tests POST endpoint /user//sms-code + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + data = json.dumps({}) + auth_header = create_authorization_header( + path=url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id), + method='POST', + request_body=data) + resp = client.post( + url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + print(resp.get_data(as_text=True)) + assert resp.status_code == 204 + app.celery.tasks.send_sms_code.apply_async.assert_called_once_with(['something_encrypted'], + queue='sms-code') + + +def test_send_user_code_for_sms_with_optional_to_field(notify_api, + sample_sms_code, + mock_secret_code, + mock_celery_send_sms_code): + """ + Tests POST endpoint '//code' successful sms with optional to field + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + data = json.dumps({'to': '+441119876757'}) + auth_header = create_authorization_header( + path=url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id), + method='POST', + request_body=data) + resp = client.post( + url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + + assert resp.status_code == 204 + encrypted = encryption.encrypt({'to': '+441119876757', 'secret_code': '11111'}) + app.celery.tasks.send_sms_code.apply_async.assert_called_once_with([encrypted], queue='sms-code') + + +def test_send_sms_code_returns_404_for_bad_input_data(notify_api, notify_db, notify_db_session): + """ + Tests POST endpoint /user//sms-code return 404 for bad input data + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + data = json.dumps({}) + import uuid + uuid_ = uuid.uuid4() + auth_header = create_authorization_header( + path=url_for('user.send_user_sms_code', user_id=uuid_), + method='POST', + request_body=data) + resp = client.post( + url_for('user.send_user_sms_code', user_id=uuid_), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 404 + assert json.loads(resp.get_data(as_text=True))['message'] == 'No user found' + + +def test_send_user_email_code(notify_api, + sample_email_code, + mock_celery_send_email_code, + mock_encryption): + """ + Tests POST endpoint /user//email-code + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + data = json.dumps({}) + auth_header = create_authorization_header( + path=url_for('user.send_user_email_code', user_id=sample_email_code.user.id), + method='POST', + request_body=data) + resp = client.post( + url_for('user.send_user_email_code', user_id=sample_email_code.user.id), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + print(resp.get_data(as_text=True)) + assert resp.status_code == 204 + app.celery.tasks.send_email_code.apply_async.assert_called_once_with(['something_encrypted'], + queue='email-code') + + +def test_send_user_email_code_returns_404_for_when_user_does_not_exist(notify_api, notify_db, notify_db_session): + """ + Tests POST endpoint /user//email-code return 404 for missing user + """ + with notify_api.test_request_context(): + with notify_api.test_client() as client: + data = json.dumps({}) + auth_header = create_authorization_header( + path=url_for('user.send_user_email_code', user_id=1), + method='POST', + request_body=data) + resp = client.post( + url_for('user.send_user_email_code', user_id=1), + data=data, + headers=[('Content-Type', 'application/json'), auth_header]) + assert resp.status_code == 404 + assert json.loads(resp.get_data(as_text=True))['message'] == 'No user found'