Files
notifications-admin/tests/__init__.py

665 lines
20 KiB
Python

import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from urllib.parse import parse_qs, urlparse
import freezegun
import pytest
from flask import session as flask_session
from flask import url_for
from flask.testing import FlaskClient
from flask_login import login_user
from app.models.user import User
# Add itsdangerous to the libraries which freezegun ignores to avoid errors.
# In tests where we freeze time, the code in the test function will get the frozen time but the
# fixtures will be using the current time. This causes itsdangerous to raise an exception - when
# the session is decoded it appears to be created in the future.
freezegun.configure(extend_ignore_list=["itsdangerous"])
class TestClient(FlaskClient):
def login(self, user, mocker=None, service=None):
# Skipping authentication here and just log them in
model_user = User(user)
with self.session_transaction() as session:
session["current_session_id"] = model_user.current_session_id
session["user_id"] = model_user.id
if mocker:
mocker.patch("app.user_api_client.get_user", return_value=user)
if mocker and service:
with self.session_transaction() as session:
session["service_id"] = service["id"]
mocker.patch(
"app.service_api_client.get_service", return_value={"data": service}
)
with patch("app.events_api_client.create_event"):
login_user(model_user)
with self.session_transaction() as test_session:
for key, value in flask_session.items():
test_session[key] = value
def logout(self, user):
self.get(url_for("main.sign_out"))
def sample_uuid():
return "6ce466d0-fd6a-11e5-82f5-e0accb9d11a6"
def generate_uuid():
return uuid.uuid4()
def created_by_json(id_, name="", email_address=""):
return {"id": id_, "name": name, "email_address": email_address}
def user_json(
id_="1234",
name="Test User",
email_address="test@gsa.gov",
mobile_number="+12028675109",
password_changed_at=None,
permissions=None,
auth_type="sms_auth",
failed_login_count=0,
logged_in_at=None,
state="active",
platform_admin=False,
current_session_id="1234",
organizations=None,
services=None,
):
if permissions is None:
permissions = {
str(generate_uuid()): [
"view_activity",
"send_texts",
"send_emails",
"manage_users",
"manage_templates",
"manage_settings",
"manage_api_keys",
]
}
if services is None:
services = [str(service_id) for service_id in permissions.keys()]
return {
"id": id_,
"name": name,
"email_address": email_address,
"mobile_number": mobile_number,
"password_changed_at": password_changed_at,
"permissions": permissions,
"auth_type": auth_type,
"failed_login_count": failed_login_count,
"logged_in_at": logged_in_at
or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
"state": state,
"platform_admin": platform_admin,
"current_session_id": current_session_id,
"organizations": organizations or [],
"services": services,
}
def invited_user(
_id="1234",
service=None,
from_user="1234",
email_address="testinviteduser@gsa.gov",
permissions=None,
status="pending",
created_at=None,
auth_type="sms_auth",
organization=None,
):
data = {
"id": _id,
"from_user": from_user,
"email_address": email_address,
"status": status,
"created_at": created_at or datetime.utcnow(),
"auth_type": auth_type,
}
if service:
data["service"] = service
if permissions:
data["permissions"] = permissions
if organization:
data["organization"] = organization
return data
def service_json(
id_="1234",
name="Test Service",
users=None,
message_limit=1000,
active=True,
restricted=True,
email_from=None,
reply_to_email_address=None,
sms_sender="GOVUK",
research_mode=False,
email_branding=None,
branding="govuk",
created_at=None,
inbound_api=None,
service_callback_api=None,
permissions=None,
organization_type="federal",
prefix_sms=True,
contact_link=None,
organization_id=None,
rate_limit=3000,
notes=None,
billing_contact_email_addresses=None,
billing_contact_names=None,
billing_reference=None,
purchase_order_number=None,
):
if users is None:
users = []
if permissions is None:
permissions = ["email", "sms"]
if service_callback_api is None:
service_callback_api = []
if inbound_api is None:
inbound_api = []
return {
"id": id_,
"name": name,
"users": users,
"message_limit": message_limit,
"rate_limit": rate_limit,
"active": active,
"restricted": restricted,
"email_from": email_from,
"reply_to_email_address": reply_to_email_address,
"sms_sender": sms_sender,
"research_mode": research_mode,
"organization_type": organization_type,
"email_branding": email_branding,
"branding": branding,
"created_at": created_at or str(datetime.utcnow()),
"permissions": permissions,
"inbound_api": inbound_api,
"service_callback_api": service_callback_api,
"prefix_sms": prefix_sms,
"contact_link": contact_link,
"volume_email": 111111,
"volume_sms": 222222,
"consent_to_research": True,
"count_as_live": True,
"organization": organization_id,
"notes": notes,
"billing_contact_email_addresses": billing_contact_email_addresses,
"billing_contact_names": billing_contact_names,
"billing_reference": billing_reference,
"purchase_order_number": purchase_order_number,
}
def organization_json(
id_="1234",
name=False,
users=None,
active=True,
created_at=None,
services=None,
email_branding_id=None,
domains=None,
agreement_signed=False,
agreement_signed_version=None,
agreement_signed_by_id=None,
agreement_signed_on_behalf_of_name=None,
agreement_signed_on_behalf_of_email_address=None,
organization_type="federal",
notes=None,
billing_contact_email_addresses=None,
billing_contact_names=None,
billing_reference=None,
purchase_order_number=None,
):
if users is None:
users = []
if services is None:
services = []
return {
"id": id_,
"name": "Test Organization" if name is False else name,
"active": active,
"users": users,
"created_at": created_at or str(datetime.utcnow()),
"email_branding_id": email_branding_id,
"organization_type": organization_type,
"agreement_signed": agreement_signed,
"agreement_signed_at": None,
"agreement_signed_by_id": agreement_signed_by_id,
"agreement_signed_version": agreement_signed_version,
"agreement_signed_on_behalf_of_name": agreement_signed_on_behalf_of_name,
"agreement_signed_on_behalf_of_email_address": agreement_signed_on_behalf_of_email_address,
"domains": domains or [],
"count_of_live_services": len(services),
"notes": notes,
"billing_contact_email_addresses": billing_contact_email_addresses,
"billing_contact_names": billing_contact_names,
"billing_reference": billing_reference,
"purchase_order_number": purchase_order_number,
}
def template_json(
service_id,
id_,
name="sample template",
type_=None,
content=None,
subject=None,
version=1,
archived=False,
process_type="normal",
redact_personalisation=None,
reply_to=None,
reply_to_text=None,
folder=None,
):
template = {
"id": id_,
"name": name,
"template_type": type_ or "sms",
"content": content,
"service": service_id,
"version": version,
"updated_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
"archived": archived,
"process_type": process_type,
"reply_to": reply_to,
"reply_to_text": reply_to_text,
"folder": folder,
}
if content is None:
template["content"] = "template content"
if subject is None and type_ != "sms":
template["subject"] = "template subject"
if subject is not None:
template["subject"] = subject
if redact_personalisation is not None:
template["redact_personalisation"] = redact_personalisation
return template
def template_version_json(
service_id, id_, created_by, version=1, created_at=None, **kwargs
):
template = template_json(service_id, id_, **kwargs)
template["created_by"] = created_by_json(
created_by["id"],
created_by["name"],
created_by["email_address"],
)
if created_at is None:
created_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")
template["created_at"] = created_at
template["version"] = version
return template
def api_key_json(id_, name, expiry_date=None):
return {"id": id_, "name": name, "expiry_date": expiry_date}
def invite_json(
id_,
from_user,
service_id,
email_address,
permissions,
created_at,
status,
auth_type,
folder_permissions,
):
return {
"id": id_,
"from_user": from_user,
"service": service_id,
"email_address": email_address,
"status": status,
"permissions": permissions,
"created_at": created_at,
"auth_type": auth_type,
"folder_permissions": folder_permissions,
}
def org_invite_json(id_, invited_by, org_id, email_address, created_at, status):
return {
"id": id_,
"invited_by": invited_by,
"organization": org_id,
"email_address": email_address,
"status": status,
"created_at": created_at,
}
def inbound_sms_json():
return {
"has_next": True,
"data": [
{
"user_number": phone_number,
"notify_number": "+12028675309",
"content": f"message-{index + 1}",
"created_at": (
datetime.utcnow() - timedelta(minutes=60 * hours_ago, seconds=index)
).isoformat(),
"id": sample_uuid(),
}
for index, hours_ago, phone_number in (
(0, 1, "+12028675300"),
(1, 1, "2028675300"),
(2, 1, "2028675300"),
(3, 3, "2028675302"),
(4, 5, "+33(0)1 12345678"), # France
(5, 7, "+1-202-555-0104"), # USA in one format
(6, 9, "+12025550104"), # USA in another format
(7, 9, "+68212345"), # Cook Islands
)
],
}
TEST_USER_EMAIL = "test@user.gsa.gov"
def create_test_api_user(state, permissions=None):
user_data = {
"id": 1,
"name": "Test User",
"password": "somepassword",
"email_address": TEST_USER_EMAIL,
"mobile_number": "+12021234123",
"state": state,
"permissions": permissions or {},
}
return user_data
def job_json(
service_id,
created_by,
job_id=None,
template_id=None,
template_version=1,
template_type="sms",
template_name="Example template",
created_at=None,
bucket_name="",
original_file_name="thisisatest.csv",
notification_count=1,
notifications_sent=1,
notifications_requested=1,
job_status="finished",
scheduled_for="",
processing_started=None,
):
if job_id is None:
job_id = str(generate_uuid())
if template_id is None:
template_id = "5d729fbd-239c-44ab-b498-75a985f3198f"
if created_at is None:
created_at = str(datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f%z"))
data = {
"id": job_id,
"service": service_id,
"template": template_id,
"template_name": template_name,
"template_version": template_version,
"template_type": template_type,
"original_file_name": original_file_name,
"created_at": created_at,
"notification_count": notification_count,
"notifications_sent": notifications_sent,
"notifications_requested": notifications_requested,
"job_status": job_status,
"statistics": [
{
"status": "blah",
"count": notifications_requested,
}
],
"created_by": created_by_json(
created_by["id"],
created_by["name"],
created_by["email_address"],
),
}
if scheduled_for:
data.update(scheduled_for=scheduled_for)
if processing_started:
data.update(processing_started=processing_started)
return data
def notification_json(
service_id,
job=None,
template=None,
to=None,
status=None,
sent_at=None,
job_row_number=None,
created_at=None,
updated_at=None,
with_links=False,
rows=5,
personalisation=None,
template_type=None,
reply_to_text=None,
client_reference=None,
created_by_name=None,
):
if template is None:
template = template_json(service_id, str(generate_uuid()), type_=template_type)
if to is None:
if template_type == "email":
to = "example@gsa.gov"
else:
to = "2021234567"
if sent_at is None:
sent_at = str(datetime.utcnow().time())
if created_at is None:
created_at = datetime.now(timezone.utc).isoformat()
if updated_at is None:
updated_at = str((datetime.utcnow() + timedelta(minutes=1)).time())
if status is None:
status = "delivered"
links = {}
if with_links:
links = {
"prev": "/service/{}/notifications?page=0".format(service_id),
"next": "/service/{}/notifications?page=1".format(service_id),
"last": "/service/{}/notifications?page=2".format(service_id),
}
job_payload = None
if job:
job_payload = {"id": job["id"], "original_file_name": job["original_file_name"]}
data = {
"notifications": [
{
"id": sample_uuid(),
"to": to,
"template": template,
"job": job_payload,
"sent_at": sent_at,
"status": status,
"created_at": created_at,
"created_by": None,
"updated_at": updated_at,
"job_row_number": job_row_number,
"service": service_id,
"template_version": template["version"],
"personalisation": personalisation or {},
"notification_type": template_type,
"reply_to_text": reply_to_text,
"client_reference": client_reference,
"created_by_name": None,
}
for i in range(rows)
],
"total": rows,
"page_size": 50,
"links": links,
}
return data
def single_notification_json(
service_id,
job=None,
template=None,
status=None,
sent_at=None,
created_at=None,
updated_at=None,
notification_type="sms",
):
if template is None:
template = template_json(service_id, str(generate_uuid()))
if sent_at is None:
sent_at = str(datetime.utcnow())
if created_at is None:
created_at = str(datetime.utcnow())
if updated_at is None:
updated_at = str(datetime.utcnow() + timedelta(minutes=1))
if status is None:
status = "delivered"
job_payload = None
if job:
job_payload = {"id": job["id"], "original_file_name": job["original_file_name"]}
data = {
"sent_at": sent_at,
"to": "2021234567",
"billable_units": 1,
"status": status,
"created_at": created_at,
"reference": None,
"updated_at": updated_at,
"template_version": 5,
"service": service_id,
"id": "29441662-17ce-4ffe-9502-fcaed73b2826",
"template": template,
"job_row_number": 0,
"notification_type": notification_type,
"api_key": None,
"job": job_payload,
"sent_by": "mmg",
}
return data
def validate_route_permission(
mocker,
notify_admin,
method,
response_code,
route,
permissions,
usr,
service,
session=None,
):
usr["permissions"][str(service["id"])] = permissions
usr["services"] = [service["id"]]
mocker.patch("app.user_api_client.check_verify_code", return_value=(True, ""))
mocker.patch("app.service_api_client.get_services", return_value={"data": []})
mocker.patch("app.service_api_client.update_service", return_value=service)
mocker.patch(
"app.service_api_client.update_service_with_properties", return_value=service
)
mocker.patch("app.user_api_client.get_user", return_value=usr)
mocker.patch("app.user_api_client.get_user_by_email", return_value=usr)
mocker.patch("app.service_api_client.get_service", return_value={"data": service})
mocker.patch("app.models.user.Users.client_method", return_value=[usr])
mocker.patch("app.job_api_client.has_jobs", return_value=False)
with notify_admin.test_request_context():
with notify_admin.test_client() as client:
client.login(usr)
if session:
with client.session_transaction() as session_:
for k, v in session.items():
session_[k] = v
resp = None
if method == "GET":
resp = client.get(route)
elif method == "POST":
resp = client.post(route)
else:
pytest.fail("Invalid method call {}".format(method))
if resp.status_code != response_code:
pytest.fail("Invalid permissions set for endpoint {}".format(route))
return resp
def validate_route_permission_with_client(
mocker, client, method, response_code, route, permissions, usr, service
):
usr["permissions"][str(service["id"])] = permissions
mocker.patch("app.user_api_client.check_verify_code", return_value=(True, ""))
mocker.patch("app.service_api_client.get_services", return_value={"data": []})
mocker.patch("app.service_api_client.update_service", return_value=service)
mocker.patch(
"app.service_api_client.update_service_with_properties", return_value=service
)
mocker.patch("app.user_api_client.get_user", return_value=usr)
mocker.patch("app.user_api_client.get_user_by_email", return_value=usr)
mocker.patch("app.service_api_client.get_service", return_value={"data": service})
mocker.patch("app.user_api_client.get_users_for_service", return_value=[usr])
mocker.patch("app.job_api_client.has_jobs", return_value=False)
client.login(usr)
resp = None
if method == "GET":
resp = client.get_response_from_url(route, _expected_status=response_code)
elif method == "POST":
resp = client.post_response_from_url(route, _expected_status=response_code)
else:
pytest.fail("Invalid method call {}".format(method))
if resp.status_code != response_code:
pytest.fail("Invalid permissions set for endpoint {}".format(route))
return resp
def assert_url_expected(actual, expected):
actual_parts = urlparse(actual)
expected_parts = urlparse(expected)
for attribute in actual_parts._fields:
if attribute == "query":
# query string ordering can be non-deterministic
# so we need to parse it first, which gives us a
# dictionary of keys and values, not a
# serialized string
assert parse_qs(expected_parts.query) == parse_qs(actual_parts.query)
else:
assert getattr(actual_parts, attribute) == getattr(
expected_parts, attribute
), ("Expected redirect: {}\n" "Actual redirect: {}").format(
expected, actual
)
def find_element_by_tag_and_partial_text(page, tag, string):
return [e for e in page.find_all(tag) if string in e.text][0]