mirror of
https://github.com/GSA/notifications-api.git
synced 2026-05-27 09:28:03 -04:00
Merge pull request #1371 from alphagov/email-2fa
Email 2fa api endpoints
This commit is contained in:
@@ -125,7 +125,8 @@ class Config(object):
|
||||
NOTIFY_USER_ID = '6af522d0-2915-4e52-83a3-3690455a5fe6'
|
||||
INVITATION_EMAIL_TEMPLATE_ID = '4f46df42-f795-4cc4-83bb-65ca312f49cc'
|
||||
SMS_CODE_TEMPLATE_ID = '36fb0730-6259-4da1-8a80-c8de22ad4246'
|
||||
EMAIL_VERIFY_CODE_TEMPLATE_ID = 'ece42649-22a8-4d06-b87f-d52d5d3f0a27'
|
||||
EMAIL_2FA_TEMPLATE_ID = '299726d2-dba6-42b8-8209-30e1d66ea164'
|
||||
NEW_USER_EMAIL_VERIFICATION_TEMPLATE_ID = 'ece42649-22a8-4d06-b87f-d52d5d3f0a27'
|
||||
PASSWORD_RESET_TEMPLATE_ID = '474e9242-823b-4f99-813d-ed392e7f1201'
|
||||
ALREADY_REGISTERED_EMAIL_TEMPLATE_ID = '0880fbb1-a0c6-46f0-9a8e-36c986381ceb'
|
||||
CHANGE_EMAIL_CONFIRMATION_TEMPLATE_ID = 'eb4d9930-87ab-4aef-9bce-786762687884'
|
||||
|
||||
@@ -47,12 +47,7 @@ def get_user_code(user, code, code_type):
|
||||
codes = VerifyCode.query.filter_by(
|
||||
user=user, code_type=code_type).order_by(
|
||||
VerifyCode.created_at.desc())
|
||||
retval = None
|
||||
for x in codes:
|
||||
if x.check_code(code):
|
||||
retval = x
|
||||
break
|
||||
return retval
|
||||
return next((x for x in codes if x.check_code(code)), None)
|
||||
|
||||
|
||||
def delete_codes_older_created_more_than_a_day_ago():
|
||||
|
||||
@@ -366,14 +366,6 @@ class JobSchema(BaseSchema):
|
||||
strict = True
|
||||
|
||||
|
||||
class RequestVerifyCodeSchema(ma.Schema):
|
||||
|
||||
class Meta:
|
||||
strict = True
|
||||
|
||||
to = fields.Str(required=False)
|
||||
|
||||
|
||||
class NotificationSchema(ma.Schema):
|
||||
|
||||
class Meta:
|
||||
@@ -653,7 +645,6 @@ api_key_schema = ApiKeySchema()
|
||||
api_key_schema_load_json = ApiKeySchema(load_json=True)
|
||||
job_schema = JobSchema()
|
||||
job_schema_load_json = JobSchema(load_json=True)
|
||||
request_verify_code_schema = RequestVerifyCodeSchema()
|
||||
sms_admin_notification_schema = SmsAdminNotificationSchema()
|
||||
sms_template_notification_schema = SmsTemplateNotificationSchema()
|
||||
job_sms_template_notification_schema = JobSmsTemplateNotificationSchema()
|
||||
|
||||
149
app/user/rest.py
149
app/user/rest.py
@@ -1,8 +1,9 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from flask import (jsonify, request, Blueprint, current_app)
|
||||
from flask import (jsonify, request, Blueprint, current_app, abort)
|
||||
|
||||
from app.config import QueueNames
|
||||
from app.dao.users_dao import (
|
||||
@@ -22,7 +23,7 @@ from app.dao.users_dao import (
|
||||
from app.dao.permissions_dao import permission_dao
|
||||
from app.dao.services_dao import dao_fetch_service_by_id
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.models import SMS_TYPE, KEY_TYPE_NORMAL, EMAIL_TYPE, Service
|
||||
from app.models import KEY_TYPE_NORMAL, Service, SMS_TYPE, EMAIL_TYPE
|
||||
from app.notifications.process_notifications import (
|
||||
persist_notification,
|
||||
send_notification_to_queue
|
||||
@@ -30,7 +31,6 @@ from app.notifications.process_notifications import (
|
||||
from app.schemas import (
|
||||
email_data_request_schema,
|
||||
user_schema,
|
||||
request_verify_code_schema,
|
||||
permission_schema,
|
||||
user_schema_load_json,
|
||||
user_update_schema_load_json,
|
||||
@@ -41,6 +41,12 @@ from app.errors import (
|
||||
InvalidRequest
|
||||
)
|
||||
from app.utils import url_with_token
|
||||
from app.user.users_schema import (
|
||||
post_verify_code_schema,
|
||||
post_send_user_sms_code_schema,
|
||||
post_send_user_email_code_schema,
|
||||
)
|
||||
from app.schema_validation import validate
|
||||
|
||||
user_blueprint = Blueprint('user', __name__)
|
||||
register_errors(user_blueprint)
|
||||
@@ -115,68 +121,99 @@ def verify_user_password(user_id):
|
||||
|
||||
@user_blueprint.route('/<uuid:user_id>/verify/code', methods=['POST'])
|
||||
def verify_user_code(user_id):
|
||||
data = request.get_json()
|
||||
validate(data, post_verify_code_schema)
|
||||
|
||||
user_to_verify = get_user_by_id(user_id=user_id)
|
||||
|
||||
req_json = request.get_json()
|
||||
verify_code = None
|
||||
code_type = None
|
||||
errors = {}
|
||||
try:
|
||||
verify_code = req_json['code']
|
||||
except KeyError:
|
||||
errors.update({'code': ['Required field missing data']})
|
||||
try:
|
||||
code_type = req_json['code_type']
|
||||
except KeyError:
|
||||
errors.update({'code_type': ['Required field missing data']})
|
||||
if errors:
|
||||
raise InvalidRequest(errors, status_code=400)
|
||||
|
||||
code = get_user_code(user_to_verify, verify_code, code_type)
|
||||
code = get_user_code(user_to_verify, data['code'], data['code_type'])
|
||||
if user_to_verify.failed_login_count >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
|
||||
raise InvalidRequest("Code not found", status_code=404)
|
||||
if not code:
|
||||
# only relevant from sms
|
||||
increment_failed_login_count(user_to_verify)
|
||||
raise InvalidRequest("Code not found", status_code=404)
|
||||
if datetime.utcnow() > code.expiry_datetime or code.code_used:
|
||||
# sms and email
|
||||
increment_failed_login_count(user_to_verify)
|
||||
raise InvalidRequest("Code has expired", status_code=400)
|
||||
|
||||
if code_type == 'sms':
|
||||
user_to_verify.current_session_id = str(uuid.uuid4())
|
||||
user_to_verify.logged_in_at = datetime.utcnow()
|
||||
user_to_verify.failed_login_count = 0
|
||||
save_model_user(user_to_verify)
|
||||
user_to_verify.current_session_id = str(uuid.uuid4())
|
||||
user_to_verify.logged_in_at = datetime.utcnow()
|
||||
user_to_verify.failed_login_count = 0
|
||||
save_model_user(user_to_verify)
|
||||
|
||||
use_user_code(code.id)
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user_blueprint.route('/<uuid:user_id>/sms-code', methods=['POST'])
|
||||
def send_user_sms_code(user_id):
|
||||
@user_blueprint.route('/<uuid:user_id>/<code_type>-code', methods=['POST'])
|
||||
def send_user_2fa_code(user_id, code_type):
|
||||
user_to_send_to = get_user_by_id(user_id=user_id)
|
||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||
|
||||
if count_user_verify_codes(user_to_send_to) >= current_app.config.get('MAX_VERIFY_CODE_COUNT'):
|
||||
# Prevent more than `MAX_VERIFY_CODE_COUNT` active verify codes at a time
|
||||
current_app.logger.warn('Max verify code has exceeded for user {}'.format(user_to_send_to.id))
|
||||
return jsonify({}), 204
|
||||
current_app.logger.warn('Too many verify codes created for user {}'.format(user_to_send_to.id))
|
||||
else:
|
||||
data = request.get_json()
|
||||
if code_type == SMS_TYPE:
|
||||
validate(data, post_send_user_sms_code_schema)
|
||||
send_user_sms_code(user_to_send_to, data)
|
||||
elif code_type == EMAIL_TYPE:
|
||||
validate(data, post_send_user_email_code_schema)
|
||||
send_user_email_code(user_to_send_to, data)
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
return '{}', 204
|
||||
|
||||
|
||||
def send_user_sms_code(user_to_send_to, data):
|
||||
recipient = data.get('to') or user_to_send_to.mobile_number
|
||||
|
||||
secret_code = create_secret_code()
|
||||
create_user_code(user_to_send_to, secret_code, SMS_TYPE)
|
||||
personalisation = {'verify_code': secret_code}
|
||||
|
||||
mobile = user_to_send_to.mobile_number if verify_code.get('to', None) is None else verify_code.get('to')
|
||||
sms_code_template_id = current_app.config['SMS_CODE_TEMPLATE_ID']
|
||||
sms_code_template = dao_get_template_by_id(sms_code_template_id)
|
||||
service = Service.query.get(current_app.config['NOTIFY_SERVICE_ID'])
|
||||
create_2fa_code(
|
||||
current_app.config['SMS_CODE_TEMPLATE_ID'],
|
||||
user_to_send_to,
|
||||
secret_code,
|
||||
recipient,
|
||||
personalisation
|
||||
)
|
||||
|
||||
|
||||
def send_user_email_code(user_to_send_to, data):
|
||||
recipient = user_to_send_to.email_address
|
||||
|
||||
secret_code = str(uuid.uuid4())
|
||||
personalisation = {
|
||||
'name': user_to_send_to.name,
|
||||
'url': _create_2fa_url(user_to_send_to, secret_code, data.get('next'))
|
||||
}
|
||||
|
||||
create_2fa_code(
|
||||
current_app.config['EMAIL_2FA_TEMPLATE_ID'],
|
||||
user_to_send_to,
|
||||
secret_code,
|
||||
recipient,
|
||||
personalisation
|
||||
)
|
||||
|
||||
|
||||
def create_2fa_code(template_id, user_to_send_to, secret_code, recipient, personalisation):
|
||||
template = dao_get_template_by_id(template_id)
|
||||
|
||||
# save the code in the VerifyCode table
|
||||
create_user_code(user_to_send_to, secret_code, template.template_type)
|
||||
|
||||
saved_notification = persist_notification(
|
||||
template_id=sms_code_template_id,
|
||||
template_version=sms_code_template.version,
|
||||
recipient=mobile,
|
||||
service=service,
|
||||
personalisation={'verify_code': secret_code},
|
||||
notification_type=SMS_TYPE,
|
||||
template_id=template.id,
|
||||
template_version=template.version,
|
||||
recipient=recipient,
|
||||
service=template.service,
|
||||
personalisation=personalisation,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL
|
||||
)
|
||||
@@ -185,8 +222,6 @@ def send_user_sms_code(user_id):
|
||||
# admin even if we're doing user research using this service:
|
||||
send_notification_to_queue(saved_notification, False, queue=QueueNames.NOTIFY)
|
||||
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user_blueprint.route('/<uuid:user_id>/change-email-verification', methods=['POST'])
|
||||
def send_user_confirm_new_email(user_id):
|
||||
@@ -208,7 +243,7 @@ def send_user_confirm_new_email(user_id):
|
||||
'url': _create_confirmation_url(user=user_to_send_to, email_address=email['email']),
|
||||
'feedback_url': current_app.config['ADMIN_BASE_URL'] + '/support'
|
||||
},
|
||||
notification_type=EMAIL_TYPE,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL
|
||||
)
|
||||
@@ -218,12 +253,11 @@ def send_user_confirm_new_email(user_id):
|
||||
|
||||
|
||||
@user_blueprint.route('/<uuid:user_id>/email-verification', methods=['POST'])
|
||||
def send_user_email_verification(user_id):
|
||||
def send_new_user_email_verification(user_id):
|
||||
# when registering, we verify all users' email addresses using this function
|
||||
user_to_send_to = get_user_by_id(user_id=user_id)
|
||||
secret_code = create_secret_code()
|
||||
create_user_code(user_to_send_to, secret_code, 'email')
|
||||
|
||||
template = dao_get_template_by_id(current_app.config['EMAIL_VERIFY_CODE_TEMPLATE_ID'])
|
||||
template = dao_get_template_by_id(current_app.config['NEW_USER_EMAIL_VERIFICATION_TEMPLATE_ID'])
|
||||
service = Service.query.get(current_app.config['NOTIFY_SERVICE_ID'])
|
||||
|
||||
saved_notification = persist_notification(
|
||||
@@ -233,9 +267,9 @@ def send_user_email_verification(user_id):
|
||||
service=service,
|
||||
personalisation={
|
||||
'name': user_to_send_to.name,
|
||||
'url': _create_verification_url(user_to_send_to, secret_code)
|
||||
'url': _create_verification_url(user_to_send_to)
|
||||
},
|
||||
notification_type=EMAIL_TYPE,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL
|
||||
)
|
||||
@@ -261,7 +295,7 @@ def send_already_registered_email(user_id):
|
||||
'forgot_password_url': current_app.config['ADMIN_BASE_URL'] + '/forgot-password',
|
||||
'feedback_url': current_app.config['ADMIN_BASE_URL'] + '/support'
|
||||
},
|
||||
notification_type=EMAIL_TYPE,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL
|
||||
)
|
||||
@@ -323,7 +357,7 @@ def send_user_reset_password():
|
||||
'user_name': user_to_send_to.name,
|
||||
'url': _create_reset_password_url(user_to_send_to.email_address)
|
||||
},
|
||||
notification_type=EMAIL_TYPE,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL
|
||||
)
|
||||
@@ -351,8 +385,8 @@ def _create_reset_password_url(email):
|
||||
return url_with_token(data, url, current_app.config)
|
||||
|
||||
|
||||
def _create_verification_url(user, secret_code):
|
||||
data = json.dumps({'user_id': str(user.id), 'email': user.email_address, 'secret_code': secret_code})
|
||||
def _create_verification_url(user):
|
||||
data = json.dumps({'user_id': str(user.id), 'email': user.email_address})
|
||||
url = '/verify-email/'
|
||||
return url_with_token(data, url, current_app.config)
|
||||
|
||||
@@ -361,3 +395,12 @@ def _create_confirmation_url(user, email_address):
|
||||
data = json.dumps({'user_id': str(user.id), 'email': email_address})
|
||||
url = '/user-profile/email/confirm/'
|
||||
return url_with_token(data, url, current_app.config)
|
||||
|
||||
|
||||
def _create_2fa_url(user, secret_code, next_redir):
|
||||
data = json.dumps({'user_id': str(user.id), 'secret_code': secret_code})
|
||||
url = '/email-auth/'
|
||||
ret = url_with_token(data, url, current_app.config)
|
||||
if next_redir:
|
||||
ret += '?{}'.format(urlencode({'next': next_redir}))
|
||||
return ret
|
||||
|
||||
41
app/user/users_schema.py
Normal file
41
app/user/users_schema.py
Normal file
@@ -0,0 +1,41 @@
|
||||
post_verify_code_schema = {
|
||||
'$schema': 'http://json-schema.org/draft-04/schema#',
|
||||
'description': 'POST schema for verifying a 2fa code',
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'code': {'type': 'string'},
|
||||
'code_type': {'type': 'string'},
|
||||
},
|
||||
'required': ['code', 'code_type'],
|
||||
'additionalProperties': False
|
||||
}
|
||||
|
||||
|
||||
post_send_user_email_code_schema = {
|
||||
'$schema': 'http://json-schema.org/draft-04/schema#',
|
||||
'description': (
|
||||
'POST schema for generating a 2fa email - "to" is required for legacy purposes. '
|
||||
'"next" is an optional url to redirect to on sign in'
|
||||
),
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
# doesn't need 'to' as we'll just grab user.email_address. but lets keep it
|
||||
# as allowed to keep admin code cleaner, but only as null to prevent confusion
|
||||
'to': {'type': 'null'},
|
||||
'next': {'type': ['string', 'null']},
|
||||
},
|
||||
'required': [],
|
||||
'additionalProperties': False
|
||||
}
|
||||
|
||||
|
||||
post_send_user_sms_code_schema = {
|
||||
'$schema': 'http://json-schema.org/draft-04/schema#',
|
||||
'description': 'POST schema for generating a 2fa sms',
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'to': {'type': ['string', 'null']},
|
||||
},
|
||||
'required': [],
|
||||
'additionalProperties': False
|
||||
}
|
||||
71
migrations/versions/0134_add_email_2fa_template_.py
Normal file
71
migrations/versions/0134_add_email_2fa_template_.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""
|
||||
|
||||
Revision ID: 0134_add_email_2fa_template
|
||||
Revises: 0133_set_services_sms_prefix
|
||||
Create Date: 2017-11-03 13:52:59.715203
|
||||
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
from alembic import op
|
||||
from flask import current_app
|
||||
|
||||
|
||||
revision = '0134_add_email_2fa_template'
|
||||
down_revision = '0133_set_services_sms_prefix'
|
||||
|
||||
template_id = '299726d2-dba6-42b8-8209-30e1d66ea164'
|
||||
|
||||
|
||||
def upgrade():
|
||||
template_insert = """
|
||||
INSERT INTO templates (id, name, template_type, created_at, content, archived, service_id, subject, created_by_id, version, process_type)
|
||||
VALUES ('{}', '{}', '{}', '{}', '{}', False, '{}', '{}', '{}', 1, '{}')
|
||||
"""
|
||||
template_history_insert = """
|
||||
INSERT INTO templates_history (id, name, template_type, created_at, content, archived, service_id, subject, created_by_id, version, process_type)
|
||||
VALUES ('{}', '{}', '{}', '{}', '{}', False, '{}', '{}', '{}', 1, '{}')
|
||||
"""
|
||||
|
||||
template_content = '\n'.join([
|
||||
'Hi ((name)),',
|
||||
'',
|
||||
'To sign in to GOV.UK Notify please open this link:',
|
||||
'((url))',
|
||||
])
|
||||
|
||||
template_name = "Notify email verify code"
|
||||
template_subject = 'Sign in to GOV.UK Notify'
|
||||
|
||||
op.execute(
|
||||
template_history_insert.format(
|
||||
template_id,
|
||||
template_name,
|
||||
'email',
|
||||
datetime.utcnow(),
|
||||
template_content,
|
||||
current_app.config['NOTIFY_SERVICE_ID'],
|
||||
template_subject,
|
||||
current_app.config['NOTIFY_USER_ID'],
|
||||
'normal'
|
||||
)
|
||||
)
|
||||
|
||||
op.execute(
|
||||
template_insert.format(
|
||||
template_id,
|
||||
template_name,
|
||||
'email',
|
||||
datetime.utcnow(),
|
||||
template_content,
|
||||
current_app.config['NOTIFY_SERVICE_ID'],
|
||||
template_subject,
|
||||
current_app.config['NOTIFY_USER_ID'],
|
||||
'normal'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.execute("DELETE FROM templates_history WHERE id = '{}'".format(template_id))
|
||||
op.execute("DELETE FROM templates WHERE id = '{}'".format(template_id))
|
||||
@@ -864,6 +864,24 @@ def sms_code_template(notify_db,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def email_2fa_code_template(notify_db, notify_db_session):
|
||||
service, user = notify_service(notify_db, notify_db_session)
|
||||
return create_custom_template(
|
||||
service=service,
|
||||
user=user,
|
||||
template_config_name='EMAIL_2FA_TEMPLATE_ID',
|
||||
content=(
|
||||
'Hi ((name)),'
|
||||
''
|
||||
'To sign in to GOV.UK Notify please open this link:'
|
||||
'((url))'
|
||||
),
|
||||
subject='Sign in to GOV.UK Notify',
|
||||
template_type='email'
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def email_verification_template(notify_db,
|
||||
notify_db_session):
|
||||
@@ -871,7 +889,7 @@ def email_verification_template(notify_db,
|
||||
return create_custom_template(
|
||||
service=service,
|
||||
user=user,
|
||||
template_config_name='EMAIL_VERIFY_CODE_TEMPLATE_ID',
|
||||
template_config_name='NEW_USER_EMAIL_VERIFICATION_TEMPLATE_ID',
|
||||
content='((user_name)) use ((url)) to complete registration',
|
||||
template_type='email'
|
||||
)
|
||||
|
||||
@@ -115,6 +115,7 @@ def test_get_all_invited_users_by_service(client, notify_db, notify_db_session,
|
||||
for invite in json_resp['data']:
|
||||
assert invite['service'] == str(sample_service.id)
|
||||
assert invite['from_user'] == str(invite_from.id)
|
||||
assert invite['auth_type'] == SMS_AUTH_TYPE
|
||||
assert invite['id']
|
||||
|
||||
|
||||
|
||||
@@ -9,11 +9,14 @@ import pytest
|
||||
from flask import url_for, current_app
|
||||
from freezegun import freeze_time
|
||||
|
||||
from app.dao.users_dao import create_user_code
|
||||
from app.dao.services_dao import dao_update_service, dao_fetch_service_by_id
|
||||
from app.models import (
|
||||
VerifyCode,
|
||||
Notification,
|
||||
User,
|
||||
Notification
|
||||
VerifyCode,
|
||||
EMAIL_TYPE,
|
||||
SMS_TYPE
|
||||
)
|
||||
from app import db
|
||||
import app.celery.tasks
|
||||
@@ -40,25 +43,6 @@ def test_user_verify_sms_code(client, sample_sms_code):
|
||||
assert sample_sms_code.user.current_session_id is not None
|
||||
|
||||
|
||||
@freeze_time('2016-01-01T12:00:00')
|
||||
def test_user_verify_email_code(client, sample_email_code):
|
||||
sample_email_code.user.logged_in_at = datetime.utcnow() - timedelta(days=1)
|
||||
assert not VerifyCode.query.first().code_used
|
||||
assert sample_email_code.user.current_session_id is None
|
||||
data = json.dumps({
|
||||
'code_type': sample_email_code.code_type,
|
||||
'code': sample_email_code.txt_code})
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
url_for('user.verify_user_code', user_id=sample_email_code.user.id),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
assert VerifyCode.query.first().code_used
|
||||
assert sample_email_code.user.logged_in_at == datetime.utcnow() - timedelta(days=1)
|
||||
assert sample_email_code.user.current_session_id is None
|
||||
|
||||
|
||||
def test_user_verify_code_missing_code(client,
|
||||
sample_sms_code):
|
||||
assert not VerifyCode.query.first().code_used
|
||||
@@ -201,17 +185,15 @@ def test_send_user_sms_code(client,
|
||||
mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
|
||||
assert mocked.call_count == 1
|
||||
assert VerifyCode.query.count() == 1
|
||||
assert VerifyCode.query.first().check_code('11111')
|
||||
assert VerifyCode.query.one().check_code('11111')
|
||||
|
||||
assert Notification.query.count() == 1
|
||||
notification = Notification.query.first()
|
||||
notification = Notification.query.one()
|
||||
assert notification.personalisation == {'verify_code': '11111'}
|
||||
assert notification.to == sample_user.mobile_number
|
||||
assert str(notification.service_id) == current_app.config['NOTIFY_SERVICE_ID']
|
||||
@@ -236,7 +218,7 @@ def test_send_user_code_for_sms_with_optional_to_field(client,
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id),
|
||||
data=json.dumps({'to': to_number}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
@@ -254,7 +236,7 @@ def test_send_sms_code_returns_404_for_bad_input_data(client):
|
||||
uuid_ = uuid.uuid4()
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=uuid_),
|
||||
url_for('user.send_user_2fa_code', code_type='sms', user_id=uuid_),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
@@ -275,25 +257,26 @@ def test_send_sms_code_returns_204_when_too_many_codes_already_created(client, s
|
||||
assert VerifyCode.query.count() == 10
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
url_for('user.send_user_sms_code', user_id=sample_user.id),
|
||||
url_for('user.send_user_2fa_code', code_type='sms', user_id=sample_user.id),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
assert VerifyCode.query.count() == 10
|
||||
|
||||
|
||||
def test_send_user_email_verification(client,
|
||||
sample_user,
|
||||
mocker,
|
||||
email_verification_template):
|
||||
def test_send_new_user_email_verification(client,
|
||||
sample_user,
|
||||
mocker,
|
||||
email_verification_template):
|
||||
mocked = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
url_for('user.send_user_email_verification', user_id=str(sample_user.id)),
|
||||
url_for('user.send_new_user_email_verification', user_id=str(sample_user.id)),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 204
|
||||
notification = Notification.query.first()
|
||||
assert VerifyCode.query.count() == 0
|
||||
mocked.assert_called_once_with(([str(notification.id)]), queue="notify-internal-tasks")
|
||||
|
||||
|
||||
@@ -305,7 +288,7 @@ def test_send_email_verification_returns_404_for_bad_input_data(client, notify_d
|
||||
uuid_ = uuid.uuid4()
|
||||
auth_header = create_authorization_header()
|
||||
resp = client.post(
|
||||
url_for('user.send_user_email_verification', user_id=uuid_),
|
||||
url_for('user.send_new_user_email_verification', user_id=uuid_),
|
||||
data=json.dumps({}),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
assert resp.status_code == 404
|
||||
@@ -355,3 +338,102 @@ def test_reset_failed_login_count_returns_404_when_user_does_not_exist(client):
|
||||
data={},
|
||||
headers=[('Content-Type', 'application/json'), create_authorization_header()])
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
def test_send_user_email_code(admin_request, mocker, sample_user, email_2fa_code_template):
|
||||
deliver_email = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
|
||||
data = {
|
||||
'to': None
|
||||
}
|
||||
admin_request.post(
|
||||
'user.send_user_2fa_code',
|
||||
code_type='email',
|
||||
user_id=sample_user.id,
|
||||
_data=data,
|
||||
_expected_status=204
|
||||
)
|
||||
noti = Notification.query.one()
|
||||
assert noti.to == sample_user.email_address
|
||||
assert str(noti.template_id) == current_app.config['EMAIL_2FA_TEMPLATE_ID']
|
||||
assert noti.personalisation['name'] == 'Test User'
|
||||
deliver_email.assert_called_once_with(
|
||||
[str(noti.id)],
|
||||
queue='notify-internal-tasks'
|
||||
)
|
||||
|
||||
|
||||
def test_send_user_email_code_with_urlencoded_next_param(admin_request, mocker, sample_user, email_2fa_code_template):
|
||||
mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
|
||||
data = {
|
||||
'to': None,
|
||||
'next': '/services'
|
||||
}
|
||||
admin_request.post(
|
||||
'user.send_user_2fa_code',
|
||||
code_type='email',
|
||||
user_id=sample_user.id,
|
||||
_data=data,
|
||||
_expected_status=204
|
||||
)
|
||||
noti = Notification.query.one()
|
||||
code = VerifyCode.query.one()
|
||||
assert noti.personalisation['url'].endswith('?next=%2Fservices')
|
||||
|
||||
|
||||
def test_send_email_code_returns_404_for_bad_input_data(admin_request):
|
||||
resp = admin_request.post(
|
||||
'user.send_user_2fa_code',
|
||||
code_type='email',
|
||||
user_id=uuid.uuid4(),
|
||||
_data={},
|
||||
_expected_status=404
|
||||
)
|
||||
assert resp['message'] == 'No result found'
|
||||
|
||||
|
||||
@freeze_time('2016-01-01T12:00:00')
|
||||
def test_user_verify_email_code(admin_request, sample_user):
|
||||
magic_code = str(uuid.uuid4())
|
||||
verify_code = create_user_code(sample_user, magic_code, EMAIL_TYPE)
|
||||
|
||||
data = {
|
||||
'code_type': 'email',
|
||||
'code': magic_code
|
||||
}
|
||||
|
||||
admin_request.post(
|
||||
'user.verify_user_code',
|
||||
user_id=sample_user.id,
|
||||
_data=data,
|
||||
_expected_status=204
|
||||
)
|
||||
|
||||
assert verify_code.code_used
|
||||
assert sample_user.logged_in_at == datetime.utcnow()
|
||||
assert sample_user.current_session_id is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize('code_type', [EMAIL_TYPE, SMS_TYPE])
|
||||
@freeze_time('2016-01-01T12:00:00')
|
||||
def test_user_verify_email_code_fails_if_code_already_used(admin_request, sample_user, code_type):
|
||||
magic_code = str(uuid.uuid4())
|
||||
verify_code = create_user_code(sample_user, magic_code, code_type)
|
||||
verify_code.code_used = True
|
||||
|
||||
data = {
|
||||
'code_type': code_type,
|
||||
'code': magic_code
|
||||
}
|
||||
|
||||
admin_request.post(
|
||||
'user.verify_user_code',
|
||||
user_id=sample_user.id,
|
||||
_data=data,
|
||||
_expected_status=400
|
||||
)
|
||||
|
||||
assert verify_code.code_used
|
||||
assert sample_user.logged_in_at is None
|
||||
assert sample_user.current_session_id is None
|
||||
|
||||
Reference in New Issue
Block a user