Files
notifications-api/tests/app/user/test_rest_verify.py

595 lines
20 KiB
Python
Raw Normal View History

2016-01-21 17:29:24 +00:00
import json
import uuid
2021-03-10 13:55:06 +00:00
from datetime import datetime, timedelta
import pytest
2021-03-10 13:55:06 +00:00
from flask import current_app, url_for
from freezegun import freeze_time
import app.celery.tasks
2021-03-10 13:55:06 +00:00
from app import db
2023-08-25 12:09:00 -07:00
from app.dao.services_dao import dao_fetch_service_by_id
2021-03-10 13:55:06 +00:00
from app.dao.users_dao import create_user_code
from app.models import (
EMAIL_TYPE,
SMS_TYPE,
USER_AUTH_TYPES,
Notification,
User,
VerifyCode,
)
from tests import create_admin_authorization_header
2016-01-21 17:29:24 +00:00
2023-08-29 14:54:30 -07:00
@freeze_time("2016-01-01T12:00:00")
def test_user_verify_sms_code(client, sample_sms_code):
sample_sms_code.user.logged_in_at = datetime.utcnow() - timedelta(days=1)
assert not VerifyCode.query.first().code_used
assert sample_sms_code.user.current_session_id is None
2023-08-29 14:54:30 -07:00
data = json.dumps(
{"code_type": sample_sms_code.code_type, "code": sample_sms_code.txt_code}
)
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
assert sample_sms_code.user.logged_in_at == datetime.utcnow()
assert sample_sms_code.user.email_access_validated_at != datetime.utcnow()
assert sample_sms_code.user.current_session_id is not None
2016-01-21 17:29:24 +00:00
2023-08-29 14:54:30 -07:00
def test_user_verify_code_missing_code(client, sample_sms_code):
assert not VerifyCode.query.first().code_used
2023-08-29 14:54:30 -07:00
data = json.dumps({"code_type": sample_sms_code.code_type})
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 400
assert not VerifyCode.query.first().code_used
assert User.query.get(sample_sms_code.user.id).failed_login_count == 0
2023-08-29 14:54:30 -07:00
def test_user_verify_code_bad_code_and_increments_failed_login_count(
client, sample_sms_code
):
assert not VerifyCode.query.first().code_used
2023-08-29 14:54:30 -07:00
data = json.dumps({"code_type": sample_sms_code.code_type, "code": "blah"})
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 404
assert not VerifyCode.query.first().code_used
assert User.query.get(sample_sms_code.user.id).failed_login_count == 1
2023-08-29 14:54:30 -07:00
@pytest.mark.parametrize(
"failed_login_count, expected_status",
(
(9, 204),
(10, 404),
),
)
def test_user_verify_code_rejects_good_code_if_too_many_failed_logins(
client,
sample_sms_code,
failed_login_count,
expected_status,
):
sample_sms_code.user.failed_login_count = failed_login_count
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=json.dumps(
{
"code_type": sample_sms_code.code_type,
"code": sample_sms_code.txt_code,
}
),
headers=[
2023-08-29 14:54:30 -07:00
("Content-Type", "application/json"),
create_admin_authorization_header(),
],
)
assert resp.status_code == expected_status
2023-08-29 14:54:30 -07:00
@freeze_time("2020-04-01 12:00")
@pytest.mark.parametrize("code_type", [EMAIL_TYPE, SMS_TYPE])
def test_user_verify_code_expired_code_and_increments_failed_login_count(
code_type, admin_request, sample_user
):
magic_code = str(uuid.uuid4())
verify_code = create_user_code(sample_user, magic_code, code_type)
verify_code.expiry_datetime = datetime(2020, 4, 1, 11, 59)
2023-08-29 14:54:30 -07:00
data = {"code_type": code_type, "code": magic_code}
admin_request.post(
2023-08-29 14:54:30 -07:00
"user.verify_user_code",
user_id=sample_user.id,
_data=data,
2023-08-29 14:54:30 -07:00
_expected_status=400,
)
assert verify_code.code_used is False
assert sample_user.logged_in_at is None
assert sample_user.current_session_id is None
assert sample_user.failed_login_count == 1
2016-01-21 17:29:24 +00:00
@freeze_time("2016-01-01 10:00:00.000000")
def test_user_verify_password(client, sample_user):
yesterday = datetime.utcnow() - timedelta(days=1)
sample_user.logged_in_at = yesterday
2023-08-29 14:54:30 -07:00
data = json.dumps({"password": "password"})
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_password", user_id=sample_user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert User.query.get(sample_user.id).logged_in_at == yesterday
2016-01-21 17:29:24 +00:00
2023-08-29 14:54:30 -07:00
def test_user_verify_password_invalid_password(client, sample_user):
data = json.dumps({"password": "bad password"})
auth_header = create_admin_authorization_header()
assert sample_user.failed_login_count == 0
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_password", user_id=sample_user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
2023-08-29 14:54:30 -07:00
assert "Incorrect password" in json_resp["message"]["password"]
assert sample_user.failed_login_count == 1
2016-01-21 17:29:24 +00:00
2023-08-29 14:54:30 -07:00
def test_user_verify_password_valid_password_resets_failed_logins(client, sample_user):
data = json.dumps({"password": "bad password"})
auth_header = create_admin_authorization_header()
assert sample_user.failed_login_count == 0
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_password", user_id=sample_user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
2023-08-29 14:54:30 -07:00
assert "Incorrect password" in json_resp["message"]["password"]
assert sample_user.failed_login_count == 1
2023-08-29 14:54:30 -07:00
data = json.dumps({"password": "password"})
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_password", user_id=sample_user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert sample_user.failed_login_count == 0
2023-08-29 14:54:30 -07:00
def test_user_verify_password_missing_password(client, sample_user):
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_password", user_id=sample_user.id),
data=json.dumps({"bingo": "bongo"}),
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 400
json_resp = json.loads(resp.get_data(as_text=True))
2023-08-29 14:54:30 -07:00
assert "Required field missing data" in json_resp["message"]["password"]
@freeze_time("2016-01-01 11:09:00.061258")
2023-08-29 14:54:30 -07:00
def test_send_user_sms_code(client, sample_user, sms_code_template, mocker):
"""
Tests POST endpoint /user/<user_id>/sms-code
"""
2023-08-29 14:54:30 -07:00
notify_service = dao_fetch_service_by_id(current_app.config["NOTIFY_SERVICE_ID"])
2024-01-10 11:38:23 -08:00
mock_redis_get = mocker.patch("app.celery.scheduled_tasks.redis_store.raw_get")
mock_redis_get.return_value = "foo"
mocker.patch("app.celery.scheduled_tasks.redis_store.raw_set")
auth_header = create_admin_authorization_header()
2023-08-29 14:54:30 -07:00
mocked = mocker.patch("app.user.rest.create_secret_code", return_value="11111")
mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async")
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_user_2fa_code", code_type="sms", user_id=sample_user.id),
data=json.dumps({}),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert mocked.call_count == 1
2023-08-29 14:54:30 -07:00
assert VerifyCode.query.one().check_code("11111")
notification = Notification.query.one()
2023-08-29 14:54:30 -07:00
assert notification.personalisation == {"verify_code": "11111"}
assert notification.to == "1"
2023-08-29 14:54:30 -07:00
assert str(notification.service_id) == current_app.config["NOTIFY_SERVICE_ID"]
assert notification.reply_to_text == notify_service.get_default_sms_sender()
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
2023-08-29 14:54:30 -07:00
([str(notification.id)]), queue="notify-internal-tasks"
)
@freeze_time("2016-01-01 11:09:00.061258")
2023-08-29 14:54:30 -07:00
def test_send_user_code_for_sms_with_optional_to_field(
client, sample_user, sms_code_template, mocker
):
"""
Tests POST endpoint /user/<user_id>/sms-code with optional to field
"""
2024-01-10 11:38:23 -08:00
mock_redis_get = mocker.patch("app.celery.scheduled_tasks.redis_store.raw_get")
mock_redis_get.return_value = "foo"
mocker.patch("app.celery.scheduled_tasks.redis_store.raw_set")
2023-08-29 14:54:30 -07:00
to_number = "+447119876757"
mocked = mocker.patch("app.user.rest.create_secret_code", return_value="11111")
mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async")
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_user_2fa_code", code_type="sms", user_id=sample_user.id),
data=json.dumps({"to": to_number}),
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert mocked.call_count == 1
notification = Notification.query.first()
assert notification.to == "1"
app.celery.provider_tasks.deliver_sms.apply_async.assert_called_once_with(
2023-08-29 14:54:30 -07:00
([str(notification.id)]), queue="notify-internal-tasks"
)
def test_send_sms_code_returns_404_for_bad_input_data(client):
uuid_ = uuid.uuid4()
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_user_2fa_code", code_type="sms", user_id=uuid_),
data=json.dumps({}),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 404
2023-08-29 14:54:30 -07:00
assert json.loads(resp.get_data(as_text=True))["message"] == "No result found"
2023-08-29 14:54:30 -07:00
def test_send_sms_code_returns_204_when_too_many_codes_already_created(
client, sample_user
):
for _ in range(5):
verify_code = VerifyCode(
2023-08-29 14:54:30 -07:00
code_type="sms",
_code=12345,
created_at=datetime.utcnow() - timedelta(minutes=10),
expiry_datetime=datetime.utcnow() + timedelta(minutes=40),
2023-08-29 14:54:30 -07:00
user=sample_user,
)
db.session.add(verify_code)
db.session.commit()
assert VerifyCode.query.count() == 5
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_user_2fa_code", code_type="sms", user_id=sample_user.id),
data=json.dumps({}),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
assert VerifyCode.query.count() == 5
2023-08-29 14:54:30 -07:00
@pytest.mark.parametrize(
"post_data, expected_url_starts_with",
(
2023-08-29 14:54:30 -07:00
(
{},
"http://localhost",
),
(
{"admin_base_url": "https://example.com"},
"https://example.com",
),
),
2023-08-29 14:54:30 -07:00
)
def test_send_new_user_email_verification(
client,
sample_user,
mocker,
email_verification_template,
post_data,
expected_url_starts_with,
):
2023-08-29 14:54:30 -07:00
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_new_user_email_verification", user_id=str(sample_user.id)),
data=json.dumps(post_data),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
notify_service = email_verification_template.service
assert resp.status_code == 204
notification = Notification.query.first()
2017-11-03 16:44:22 +00:00
assert VerifyCode.query.count() == 0
2023-08-29 14:54:30 -07:00
mocked.assert_called_once_with(
([str(notification.id)]), queue="notify-internal-tasks"
)
assert (
notification.reply_to_text
== notify_service.get_default_reply_to_email_address()
)
assert notification.personalisation["name"] == "Test User"
assert notification.personalisation["url"].startswith(expected_url_starts_with)
2023-08-29 14:54:30 -07:00
def test_send_email_verification_returns_404_for_bad_input_data(
client, notify_db_session, mocker
):
2016-06-09 16:41:20 +01:00
"""
Tests POST endpoint /user/<user_id>/sms-code return 404 for bad input data
"""
2023-08-29 14:54:30 -07:00
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
uuid_ = uuid.uuid4()
auth_header = create_admin_authorization_header()
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_new_user_email_verification", user_id=uuid_),
data=json.dumps({}),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 404
2023-08-29 14:54:30 -07:00
assert json.loads(resp.get_data(as_text=True))["message"] == "No result found"
assert mocked.call_count == 0
2023-08-29 14:54:30 -07:00
def test_user_verify_user_code_returns_404_when_code_is_right_but_user_account_is_locked(
client, sample_sms_code
):
sample_sms_code.user.failed_login_count = 10
2023-08-29 14:54:30 -07:00
data = json.dumps(
{"code_type": sample_sms_code.code_type, "code": sample_sms_code.txt_code}
)
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[
("Content-Type", "application/json"),
create_admin_authorization_header(),
],
)
assert resp.status_code == 404
assert sample_sms_code.user.failed_login_count == 10
assert not sample_sms_code.code_used
2023-08-29 14:54:30 -07:00
def test_user_verify_user_code_valid_code_resets_failed_login_count(
client, sample_sms_code
):
sample_sms_code.user.failed_login_count = 1
2023-08-29 14:54:30 -07:00
data = json.dumps(
{"code_type": sample_sms_code.code_type, "code": sample_sms_code.txt_code}
)
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.verify_user_code", user_id=sample_sms_code.user.id),
data=data,
2023-08-29 14:54:30 -07:00
headers=[
("Content-Type", "application/json"),
create_admin_authorization_header(),
],
)
assert resp.status_code == 204
assert sample_sms_code.user.failed_login_count == 0
assert sample_sms_code.code_used
def test_user_reset_failed_login_count_returns_200(client, sample_user):
sample_user.failed_login_count = 1
2023-08-29 14:54:30 -07:00
resp = client.post(
url_for("user.user_reset_failed_login_count", user_id=sample_user.id),
data={},
headers=[
("Content-Type", "application/json"),
create_admin_authorization_header(),
],
)
assert resp.status_code == 200
assert sample_user.failed_login_count == 0
def test_reset_failed_login_count_returns_404_when_user_does_not_exist(client):
2023-08-29 14:54:30 -07:00
resp = client.post(
url_for("user.user_reset_failed_login_count", user_id=uuid.uuid4()),
data={},
headers=[
("Content-Type", "application/json"),
create_admin_authorization_header(),
],
)
assert resp.status_code == 404
# we send sms_auth users and webauthn_auth users email code to validate their email access
2023-08-29 14:54:30 -07:00
@pytest.mark.parametrize("auth_type", USER_AUTH_TYPES)
@pytest.mark.parametrize(
"data, expected_auth_url",
(
2023-08-29 14:54:30 -07:00
(
{},
"http://localhost:6012/email-auth/%2E",
),
(
{"to": None},
"http://localhost:6012/email-auth/%2E",
),
(
{"to": None, "email_auth_link_host": "https://example.com"},
"https://example.com/email-auth/%2E",
),
),
2023-08-29 14:54:30 -07:00
)
def test_send_user_email_code(
admin_request,
mocker,
sample_user,
email_2fa_code_template,
data,
expected_auth_url,
2023-08-29 14:54:30 -07:00
auth_type,
):
2023-08-29 14:54:30 -07:00
deliver_email = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
sample_user.auth_type = auth_type
2024-01-10 11:00:50 -08:00
mock_redis_get = mocker.patch("app.celery.scheduled_tasks.redis_store.raw_get")
2024-01-10 11:20:33 -08:00
mock_redis_get.return_value = "foo"
2024-01-10 11:00:50 -08:00
2024-01-10 11:14:04 -08:00
mocker.patch("app.celery.scheduled_tasks.redis_store.raw_set")
2024-01-10 11:00:50 -08:00
admin_request.post(
2023-08-29 14:54:30 -07:00
"user.send_user_2fa_code",
code_type="email",
user_id=sample_user.id,
_data=data,
2023-08-29 14:54:30 -07:00
_expected_status=204,
)
noti = Notification.query.one()
2023-08-29 14:54:30 -07:00
assert (
noti.reply_to_text
== email_2fa_code_template.service.get_default_reply_to_email_address()
)
assert noti.to == "1"
2023-08-29 14:54:30 -07:00
assert str(noti.template_id) == current_app.config["EMAIL_2FA_TEMPLATE_ID"]
assert noti.personalisation["name"] == "Test User"
assert noti.personalisation["url"].startswith(expected_auth_url)
deliver_email.assert_called_once_with([str(noti.id)], queue="notify-internal-tasks")
2023-08-29 14:54:30 -07:00
def test_send_user_email_code_with_urlencoded_next_param(
admin_request, mocker, sample_user, email_2fa_code_template
):
mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
2024-01-10 11:38:23 -08:00
mock_redis_get = mocker.patch("app.celery.scheduled_tasks.redis_store.raw_get")
mock_redis_get.return_value = "foo"
mocker.patch("app.celery.scheduled_tasks.redis_store.raw_set")
2023-08-29 14:54:30 -07:00
data = {"to": None, "next": "/services"}
admin_request.post(
2023-08-29 14:54:30 -07:00
"user.send_user_2fa_code",
code_type="email",
user_id=sample_user.id,
_data=data,
2023-08-29 14:54:30 -07:00
_expected_status=204,
)
noti = Notification.query.one()
2023-08-29 14:54:30 -07:00
assert noti.personalisation["url"].endswith("?next=%2Fservices")
def test_send_email_code_returns_404_for_bad_input_data(admin_request):
resp = admin_request.post(
2023-08-29 14:54:30 -07:00
"user.send_user_2fa_code",
code_type="email",
user_id=uuid.uuid4(),
_data={},
2023-08-29 14:54:30 -07:00
_expected_status=404,
)
2023-08-29 14:54:30 -07:00
assert resp["message"] == "No result found"
2023-08-29 14:54:30 -07:00
@freeze_time("2016-01-01T12:00:00")
# we send sms_auth and webauthn_auth users email code to validate their email access
2023-08-29 14:54:30 -07:00
@pytest.mark.parametrize("auth_type", USER_AUTH_TYPES)
def test_user_verify_email_code(admin_request, sample_user, auth_type):
sample_user.logged_in_at = datetime.utcnow() - timedelta(days=1)
sample_user.email_access_validated_at = datetime.utcnow() - timedelta(days=1)
sample_user.auth_type = auth_type
magic_code = str(uuid.uuid4())
verify_code = create_user_code(sample_user, magic_code, EMAIL_TYPE)
2023-08-29 14:54:30 -07:00
data = {"code_type": "email", "code": magic_code}
admin_request.post(
2023-08-29 14:54:30 -07:00
"user.verify_user_code",
user_id=sample_user.id,
_data=data,
2023-08-29 14:54:30 -07:00
_expected_status=204,
)
assert verify_code.code_used
assert sample_user.logged_in_at == datetime.utcnow()
assert sample_user.email_access_validated_at == datetime.utcnow()
assert sample_user.current_session_id is not None
2017-11-06 16:46:53 +00:00
2023-08-29 14:54:30 -07:00
@pytest.mark.parametrize("code_type", [EMAIL_TYPE, SMS_TYPE])
@freeze_time("2016-01-01T12:00:00")
def test_user_verify_email_code_fails_if_code_already_used(
admin_request, sample_user, code_type
):
2017-11-06 16:46:53 +00:00
magic_code = str(uuid.uuid4())
verify_code = create_user_code(sample_user, magic_code, code_type)
verify_code.code_used = True
2023-08-29 14:54:30 -07:00
data = {"code_type": code_type, "code": magic_code}
2017-11-06 16:46:53 +00:00
admin_request.post(
2023-08-29 14:54:30 -07:00
"user.verify_user_code",
2017-11-06 16:46:53 +00:00
user_id=sample_user.id,
_data=data,
2023-08-29 14:54:30 -07:00
_expected_status=400,
2017-11-06 16:46:53 +00:00
)
assert verify_code.code_used
assert sample_user.logged_in_at is None
assert sample_user.current_session_id is None
def test_send_user_2fa_code_sends_from_number_for_international_numbers(
2023-08-29 14:54:30 -07:00
client, sample_user, mocker, sms_code_template
):
2024-01-10 11:38:23 -08:00
mock_redis_get = mocker.patch("app.celery.scheduled_tasks.redis_store.raw_get")
mock_redis_get.return_value = "foo"
mocker.patch("app.celery.scheduled_tasks.redis_store.raw_set")
2023-01-04 16:35:25 -05:00
sample_user.mobile_number = "+601117224412"
auth_header = create_admin_authorization_header()
2023-08-29 14:54:30 -07:00
mocker.patch("app.user.rest.create_secret_code", return_value="11111")
mocker.patch("app.user.rest.send_notification_to_queue")
resp = client.post(
2023-08-29 14:54:30 -07:00
url_for("user.send_user_2fa_code", code_type="sms", user_id=sample_user.id),
data=json.dumps({}),
2023-08-29 14:54:30 -07:00
headers=[("Content-Type", "application/json"), auth_header],
)
assert resp.status_code == 204
notification = Notification.query.first()
2023-08-29 14:54:30 -07:00
assert (
notification.reply_to_text
== current_app.config["NOTIFY_INTERNATIONAL_SMS_SENDER"]
)