mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-24 03:49:46 -05:00
@@ -349,7 +349,7 @@
|
||||
"filename": "tests/app/user/test_rest.py",
|
||||
"hashed_secret": "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33",
|
||||
"is_verified": false,
|
||||
"line_number": 962,
|
||||
"line_number": 810,
|
||||
"is_secret": false
|
||||
}
|
||||
],
|
||||
@@ -384,5 +384,5 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"generated_at": "2024-07-10T20:12:22Z"
|
||||
"generated_at": "2024-07-22T21:27:35Z"
|
||||
}
|
||||
|
||||
2
Makefile
2
Makefile
@@ -54,7 +54,7 @@ run-celery: ## Run celery, TODO remove purge for staging/prod
|
||||
|
||||
|
||||
.PHONY: dead-code
|
||||
dead-code:
|
||||
dead-code: ## Use 60 to look for suspected dead code
|
||||
poetry run vulture ./app --min-confidence=100
|
||||
|
||||
.PHONY: run-celery-beat
|
||||
|
||||
@@ -10,11 +10,6 @@ from app.exceptions import ArchiveValidationError
|
||||
from notifications_utils.recipients import InvalidEmailError
|
||||
|
||||
|
||||
class VirusScanError(Exception):
|
||||
def __init__(self, message):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class InvalidRequest(Exception):
|
||||
code = None
|
||||
fields = []
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from flask import Blueprint, jsonify, request
|
||||
|
||||
from app.celery.process_ses_receipts_tasks import process_ses_results
|
||||
@@ -8,7 +6,6 @@ from app.errors import InvalidRequest
|
||||
from app.notifications.sns_handlers import sns_notification_handler
|
||||
|
||||
ses_callback_blueprint = Blueprint("notifications_ses_callback", __name__)
|
||||
DEFAULT_MAX_AGE = timedelta(days=10000)
|
||||
|
||||
|
||||
# 400 counts as a permanent failure so SNS will not retry.
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import enum
|
||||
from datetime import timedelta
|
||||
from json import decoder
|
||||
|
||||
import requests
|
||||
@@ -8,8 +7,6 @@ from flask import current_app, json
|
||||
from app.errors import InvalidRequest
|
||||
from app.notifications.sns_cert_validator import validate_sns_cert
|
||||
|
||||
DEFAULT_MAX_AGE = timedelta(days=10000)
|
||||
|
||||
|
||||
class SNSMessageType(enum.Enum):
|
||||
SubscriptionConfirmation = "SubscriptionConfirmation"
|
||||
|
||||
@@ -30,7 +30,6 @@ from app.dao.users_dao import (
|
||||
reset_failed_login_count,
|
||||
save_model_user,
|
||||
save_user_attribute,
|
||||
update_user_password,
|
||||
use_user_code,
|
||||
)
|
||||
from app.enums import CodeType, KeyType, NotificationType, TemplateType
|
||||
@@ -45,7 +44,6 @@ from app.schemas import (
|
||||
create_user_schema,
|
||||
email_data_request_schema,
|
||||
partial_email_data_request_schema,
|
||||
user_update_password_schema_load_json,
|
||||
user_update_schema_load_json,
|
||||
)
|
||||
from app.user.users_schema import (
|
||||
@@ -628,57 +626,6 @@ def get_all_users():
|
||||
return jsonify(data=result), 200
|
||||
|
||||
|
||||
@user_blueprint.route("/reset-password", methods=["POST"])
|
||||
def send_user_reset_password():
|
||||
request_json = request.get_json()
|
||||
email = email_data_request_schema.load(request_json)
|
||||
|
||||
user_to_send_to = get_user_by_email(email["email"])
|
||||
template = dao_get_template_by_id(current_app.config["PASSWORD_RESET_TEMPLATE_ID"])
|
||||
service = Service.query.get(current_app.config["NOTIFY_SERVICE_ID"])
|
||||
personalisation = {
|
||||
"user_name": user_to_send_to.name,
|
||||
"url": _create_reset_password_url(
|
||||
user_to_send_to.email_address,
|
||||
base_url=request_json.get("admin_base_url"),
|
||||
next_redirect=request_json.get("next"),
|
||||
),
|
||||
}
|
||||
saved_notification = persist_notification(
|
||||
template_id=template.id,
|
||||
template_version=template.version,
|
||||
recipient=email["email"],
|
||||
service=service,
|
||||
personalisation=None,
|
||||
notification_type=template.template_type,
|
||||
api_key_id=None,
|
||||
key_type=KeyType.NORMAL,
|
||||
reply_to_text=service.get_default_reply_to_email_address(),
|
||||
)
|
||||
saved_notification.personalisation = personalisation
|
||||
|
||||
redis_store.set(
|
||||
f"email-personalisation-{saved_notification.id}",
|
||||
json.dumps(personalisation),
|
||||
ex=60 * 60,
|
||||
)
|
||||
send_notification_to_queue(saved_notification, queue=QueueNames.NOTIFY)
|
||||
|
||||
return jsonify({}), 204
|
||||
|
||||
|
||||
@user_blueprint.route("/<uuid:user_id>/update-password", methods=["POST"])
|
||||
def update_password(user_id):
|
||||
user = get_user_by_id(user_id=user_id)
|
||||
req_json = request.get_json()
|
||||
password = req_json.get("_password")
|
||||
|
||||
user_update_password_schema_load_json.load(req_json)
|
||||
|
||||
update_user_password(user, password)
|
||||
return jsonify(data=user.serialize()), 200
|
||||
|
||||
|
||||
@user_blueprint.route("/report-all-users", methods=["GET"])
|
||||
def report_all_users():
|
||||
users = dao_report_users()
|
||||
@@ -692,17 +639,6 @@ def get_organizations_and_services_for_user(user_id):
|
||||
return jsonify(data)
|
||||
|
||||
|
||||
def _create_reset_password_url(email, next_redirect, base_url=None):
|
||||
data = json.dumps({"email": email, "created_at": str(utc_now())})
|
||||
static_url_part = "/new-password/"
|
||||
full_url = url_with_token(
|
||||
data, static_url_part, current_app.config, base_url=base_url
|
||||
)
|
||||
if next_redirect:
|
||||
full_url += "?{}".format(urlencode({"next": next_redirect}))
|
||||
return full_url
|
||||
|
||||
|
||||
def _create_verification_url(user, base_url):
|
||||
data = json.dumps({"user_id": str(user.id), "email": user.email_address})
|
||||
url = "/verify-email/"
|
||||
|
||||
19
app/utils.py
19
app/utils.py
@@ -60,11 +60,6 @@ def get_midnight_in_utc(date):
|
||||
return datetime.combine(date, datetime.min.time())
|
||||
|
||||
|
||||
def get_midnight_for_day_before(date):
|
||||
day_before = date - timedelta(1)
|
||||
return get_midnight_in_utc(day_before)
|
||||
|
||||
|
||||
def get_month_from_utc_column(column):
|
||||
"""
|
||||
Where queries need to count notifications by month it needs to be
|
||||
@@ -112,20 +107,6 @@ def get_dt_string_or_none(val):
|
||||
return val.strftime(DATETIME_FORMAT) if val else None
|
||||
|
||||
|
||||
def get_uuid_string_or_none(val):
|
||||
return str(val) if val else None
|
||||
|
||||
|
||||
def format_sequential_number(sequential_number):
|
||||
return format(sequential_number, "x").zfill(8)
|
||||
|
||||
|
||||
def get_reference_from_personalisation(personalisation):
|
||||
if personalisation:
|
||||
return personalisation.get("reference")
|
||||
return None
|
||||
|
||||
|
||||
# Function used for debugging.
|
||||
# Do print(hilite(message)) while debugging, then remove your print statements
|
||||
def hilite(message):
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
from app.errors import VirusScanError
|
||||
|
||||
|
||||
def test_virus_scan_error():
|
||||
vse = VirusScanError("a message")
|
||||
assert "a message" in vse.args
|
||||
@@ -1,4 +1,3 @@
|
||||
import uuid
|
||||
from datetime import date, datetime
|
||||
|
||||
import pytest
|
||||
@@ -6,12 +5,8 @@ from freezegun import freeze_time
|
||||
|
||||
from app.enums import ServicePermissionType
|
||||
from app.utils import (
|
||||
format_sequential_number,
|
||||
get_midnight_for_day_before,
|
||||
get_midnight_in_utc,
|
||||
get_public_notify_type_text,
|
||||
get_reference_from_personalisation,
|
||||
get_uuid_string_or_none,
|
||||
midnight_n_days_ago,
|
||||
)
|
||||
|
||||
@@ -31,18 +26,6 @@ def test_get_midnight_in_utc_returns_expected_date(date, expected_date):
|
||||
assert get_midnight_in_utc(date) == expected_date
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"date, expected_date",
|
||||
[
|
||||
(datetime(2016, 1, 15, 0, 30), datetime(2016, 1, 14, 0, 0)),
|
||||
(datetime(2016, 7, 15, 0, 0), datetime(2016, 7, 14, 0, 0)),
|
||||
(datetime(2016, 8, 23, 11, 59), datetime(2016, 8, 22, 0, 0)),
|
||||
],
|
||||
)
|
||||
def test_get_midnight_for_day_before_returns_expected_date(date, expected_date):
|
||||
assert get_midnight_for_day_before(date) == expected_date
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"current_time, arg, expected_datetime",
|
||||
[
|
||||
@@ -65,40 +48,7 @@ def test_midnight_n_days_ago(current_time, arg, expected_datetime):
|
||||
assert midnight_n_days_ago(arg) == expected_datetime
|
||||
|
||||
|
||||
def test_format_sequential_number():
|
||||
assert format_sequential_number(123) == "0000007b"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"personalisation, expected_response",
|
||||
[
|
||||
({"nothing": "interesting"}, None),
|
||||
({"reference": "something"}, "something"),
|
||||
(None, None),
|
||||
],
|
||||
)
|
||||
def test_get_reference_from_personalisation(personalisation, expected_response):
|
||||
assert get_reference_from_personalisation(personalisation) == expected_response
|
||||
|
||||
|
||||
def test_get_uuid_string_or_none():
|
||||
my_uuid = uuid.uuid4()
|
||||
assert str(my_uuid) == get_uuid_string_or_none(my_uuid)
|
||||
|
||||
assert get_uuid_string_or_none(None) is None
|
||||
|
||||
|
||||
def test_get_public_notify_type_text():
|
||||
assert (
|
||||
get_public_notify_type_text(ServicePermissionType.UPLOAD_DOCUMENT) == "document"
|
||||
)
|
||||
|
||||
|
||||
# This method is used for simulating bulk sends. We use localstack and run on a developer's machine to do the
|
||||
# simulation. Please see docs->bulk_testing.md for instructions.
|
||||
# def test_generate_csv_for_bulk_testing():
|
||||
# f = open("bulktest_100000.csv", "w")
|
||||
# f.write("phone number\n")
|
||||
# for _ in range(0, 100000):
|
||||
# f.write("16615555555\n")
|
||||
# f.close()
|
||||
|
||||
@@ -632,137 +632,6 @@ def test_remove_user_folder_permissions(admin_request, sample_user, sample_servi
|
||||
assert service_user.folders == []
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_reset_password_should_send_reset_password_link(
|
||||
admin_request, sample_user, mocker, password_reset_email_template
|
||||
):
|
||||
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
data = {"email": sample_user.email_address}
|
||||
notify_service = password_reset_email_template.service
|
||||
|
||||
admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
mocked.assert_called_once_with(
|
||||
[str(notification.id)], queue="notify-internal-tasks"
|
||||
)
|
||||
assert (
|
||||
notification.reply_to_text
|
||||
== notify_service.get_default_reply_to_email_address()
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data, expected_url",
|
||||
(
|
||||
(
|
||||
{
|
||||
"email": "notify@digital.fake.gov",
|
||||
},
|
||||
("http://localhost:6012/new-password/"),
|
||||
),
|
||||
(
|
||||
{
|
||||
"email": "notify@digital.fake.gov",
|
||||
"admin_base_url": "https://different.example.com",
|
||||
},
|
||||
("https://different.example.com/new-password/"),
|
||||
),
|
||||
),
|
||||
)
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_reset_password_should_use_provided_base_url(
|
||||
admin_request,
|
||||
sample_user,
|
||||
password_reset_email_template,
|
||||
mocker,
|
||||
data,
|
||||
expected_url,
|
||||
):
|
||||
mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
|
||||
admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
assert Notification.query.first().personalisation["url"].startswith(expected_url)
|
||||
|
||||
|
||||
@freeze_time("2016-01-01 11:09:00.061258")
|
||||
def test_send_user_reset_password_reset_password_link_contains_redirect_link_if_present_in_request(
|
||||
admin_request, sample_user, mocker, password_reset_email_template
|
||||
):
|
||||
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
data = {"email": sample_user.email_address, "next": "blob"}
|
||||
|
||||
admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
notification = Notification.query.first()
|
||||
assert "?next=blob" in notification.content
|
||||
mocked.assert_called_once_with(
|
||||
[str(notification.id)], queue="notify-internal-tasks"
|
||||
)
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_email_is_missing(
|
||||
admin_request, mocker
|
||||
):
|
||||
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
data = {}
|
||||
|
||||
json_resp = admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
assert json_resp["message"] == {"email": ["Missing data for required field."]}
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_user_doesnot_exist(
|
||||
admin_request, mocker
|
||||
):
|
||||
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
bad_email_address = "bad@email.gov.uk"
|
||||
data = {"email": bad_email_address}
|
||||
|
||||
json_resp = admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=404,
|
||||
)
|
||||
|
||||
assert json_resp["message"] == "No result found"
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_user_reset_password_should_return_400_when_data_is_not_email_address(
|
||||
admin_request, mocker
|
||||
):
|
||||
mocked = mocker.patch("app.celery.provider_tasks.deliver_email.apply_async")
|
||||
bad_email_address = "bad.email.gov.uk"
|
||||
data = {"email": bad_email_address}
|
||||
|
||||
json_resp = admin_request.post(
|
||||
"user.send_user_reset_password",
|
||||
_data=data,
|
||||
_expected_status=400,
|
||||
)
|
||||
|
||||
assert json_resp["message"] == {"email": ["Not a valid email address"]}
|
||||
assert mocked.call_count == 0
|
||||
|
||||
|
||||
def test_send_already_registered_email(
|
||||
admin_request, sample_user, already_registered_template, mocker
|
||||
):
|
||||
@@ -842,27 +711,6 @@ def test_send_user_confirm_new_email_returns_400_when_email_missing(
|
||||
mocked.assert_not_called()
|
||||
|
||||
|
||||
@freeze_time("2020-02-14T12:00:00")
|
||||
def test_update_user_password_saves_correctly(admin_request, sample_service):
|
||||
sample_user = sample_service.users[0]
|
||||
new_password = "1234567890"
|
||||
data = {"_password": "1234567890"}
|
||||
|
||||
json_resp = admin_request.post(
|
||||
"user.update_password", user_id=str(sample_user.id), _data=data
|
||||
)
|
||||
|
||||
assert json_resp["data"]["password_changed_at"] is not None
|
||||
data = {"password": new_password}
|
||||
|
||||
admin_request.post(
|
||||
"user.verify_user_password",
|
||||
user_id=str(sample_user.id),
|
||||
_data=data,
|
||||
_expected_status=204,
|
||||
)
|
||||
|
||||
|
||||
def test_activate_user(admin_request, sample_user):
|
||||
sample_user.state = "pending"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user