mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-28 13:31:54 -05:00
more
This commit is contained in:
4
Makefile
4
Makefile
@@ -60,8 +60,8 @@ test: export NEW_RELIC_ENVIRONMENT=test
|
||||
test: ## Run tests and create coverage report
|
||||
pipenv run flake8 .
|
||||
pipenv run isort --check-only ./app ./tests
|
||||
pipenv run coverage run -m pytest --maxfail=10
|
||||
pipenv run coverage report --fail-under=92
|
||||
pipenv run coverage run -m pytest --maxfail=10
|
||||
pipenv run coverage report -m --fail-under=92
|
||||
pipenv run coverage html -d .coverage_cache
|
||||
|
||||
.PHONY: freeze-requirements
|
||||
|
||||
@@ -139,6 +139,8 @@ def purge_functional_test_data(user_email_prefix):
|
||||
print(f"Deleting user {usr.id} which is not part of any services")
|
||||
delete_user_verify_codes(usr)
|
||||
delete_model_user(usr)
|
||||
raise Exception(f"RAN with user {usr}")
|
||||
raise Exception("NO USERS")
|
||||
|
||||
|
||||
@notify_command(name='insert-inbound-numbers')
|
||||
@@ -248,7 +250,8 @@ def bulk_invite_user_to_service(file_name, service_id, user_id, auth_type, permi
|
||||
@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d'))
|
||||
def update_jobs_archived_flag(start_date, end_date):
|
||||
current_app.logger.info('Archiving jobs created between {} to {}'.format(start_date, end_date))
|
||||
|
||||
print('Archiving jobs created between {} to {}'.format(start_date, end_date))
|
||||
|
||||
process_date = start_date
|
||||
total_updated = 0
|
||||
@@ -258,10 +261,9 @@ def update_jobs_archived_flag(start_date, end_date):
|
||||
sql = """update
|
||||
jobs set archived = true
|
||||
where
|
||||
created_at >= (date :start + time '00:00:00') at time zone 'America/New_York'
|
||||
at time zone 'UTC'
|
||||
and created_at < (date :end + time '00:00:00') at time zone 'America/New_York' at time zone 'UTC'"""
|
||||
|
||||
created_at >= (date :start + time '00:00:00')
|
||||
and created_at < (date :end + time '00:00:00')
|
||||
"""
|
||||
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
|
||||
db.session.commit()
|
||||
current_app.logger.info('jobs: --- Completed took {}ms. Archived {} jobs for {}'.format(
|
||||
@@ -286,7 +288,10 @@ def populate_organizations_from_file(file_name):
|
||||
# The expectation is that the organization, organization_to_service
|
||||
# and user_to_organization will be cleared before running this command.
|
||||
# Ignoring duplicates allows us to run the command again with the same file or same file with new rows.
|
||||
msg("enter")
|
||||
with open(file_name, 'r') as f:
|
||||
msg(f"\nfile is open {file_name}")
|
||||
|
||||
def boolean_or_none(field):
|
||||
if field == '1':
|
||||
return True
|
||||
@@ -296,10 +301,13 @@ def populate_organizations_from_file(file_name):
|
||||
return None
|
||||
|
||||
for line in itertools.islice(f, 1, None):
|
||||
msg(f"XXX line {line}")
|
||||
columns = line.split('|')
|
||||
msg(f"XXX columns {columns}")
|
||||
print(columns)
|
||||
email_branding = None
|
||||
email_branding_column = columns[5].strip()
|
||||
msg(f"XXX email_branding_column {email_branding_column}")
|
||||
if len(email_branding_column) > 0:
|
||||
email_branding = EmailBranding.query.filter(EmailBranding.name == email_branding_column).one()
|
||||
data = {
|
||||
@@ -309,12 +317,14 @@ def populate_organizations_from_file(file_name):
|
||||
'organization_type': columns[1].lower(),
|
||||
'email_branding_id': email_branding.id if email_branding else None
|
||||
}
|
||||
msg(f"XXX data {data}")
|
||||
org = Organization(**data)
|
||||
try:
|
||||
db.session.add(org)
|
||||
db.session.commit()
|
||||
except IntegrityError:
|
||||
print("duplicate org", org.name)
|
||||
msg("XXX duplicate org")
|
||||
db.session.rollback()
|
||||
domains = columns[4].split(',')
|
||||
for d in domains:
|
||||
@@ -324,10 +334,17 @@ def populate_organizations_from_file(file_name):
|
||||
db.session.add(domain)
|
||||
db.session.commit()
|
||||
except IntegrityError:
|
||||
msg("XXX duplicate domain")
|
||||
print("duplicate domain", d.strip())
|
||||
db.session.rollback()
|
||||
|
||||
|
||||
def msg(msg):
|
||||
f = open("huhhh.txt", "a")
|
||||
f.write(msg)
|
||||
f.close()
|
||||
|
||||
|
||||
@notify_command(name='populate-organization-agreement-details-from-file')
|
||||
@click.option('-f', '--file_name', required=True,
|
||||
help="CSV file containing id, agreement_signed_version, "
|
||||
@@ -342,15 +359,12 @@ def populate_organization_agreement_details_from_file(file_name):
|
||||
"""
|
||||
with open(file_name) as f:
|
||||
csv_reader = csv.reader(f)
|
||||
|
||||
# ignore the header row
|
||||
next(csv_reader)
|
||||
|
||||
for row in csv_reader:
|
||||
org = dao_get_organization_by_id(row[0])
|
||||
|
||||
current_app.logger.info(f"Updating {org.name}")
|
||||
|
||||
if not org.agreement_signed:
|
||||
raise RuntimeError('Agreement was not signed')
|
||||
|
||||
|
||||
@@ -106,7 +106,7 @@ class UserSchema(BaseSchema):
|
||||
permissions = fields.Method("user_permissions", dump_only=True)
|
||||
password_changed_at = field_for(models.User, 'password_changed_at', format=DATETIME_FORMAT_NO_TIMEZONE)
|
||||
created_at = field_for(models.User, 'created_at', format=DATETIME_FORMAT_NO_TIMEZONE)
|
||||
updated_at = FlexibleDateTime()
|
||||
updated_atx = FlexibleDateTime()
|
||||
logged_in_at = FlexibleDateTime()
|
||||
auth_type = field_for(models.User, 'auth_type')
|
||||
password = fields.String(required=True, load_only=True)
|
||||
|
||||
@@ -28,6 +28,7 @@ from app.dao.services_dao import (
|
||||
dao_fetch_all_services_by_user,
|
||||
dao_fetch_live_services_data,
|
||||
dao_fetch_service_by_id,
|
||||
dao_fetch_service_by_id_with_api_keys,
|
||||
dao_fetch_service_by_inbound_number,
|
||||
dao_fetch_todays_stats_for_all_services,
|
||||
dao_fetch_todays_stats_for_service,
|
||||
@@ -133,6 +134,39 @@ def test_create_service_with_organization(notify_db_session):
|
||||
assert service.organization == organization
|
||||
|
||||
|
||||
def test_fetch_service_by_id_with_api_keys(notify_db_session):
|
||||
user = create_user(email='local.authority@local-authority.gov.uk')
|
||||
organization = create_organization(
|
||||
name='Some local authority', organization_type='state', domains=['local-authority.gov.uk'])
|
||||
assert Service.query.count() == 0
|
||||
service = Service(name="service_name",
|
||||
email_from="email_from",
|
||||
message_limit=1000,
|
||||
restricted=False,
|
||||
organization_type='federal',
|
||||
created_by=user)
|
||||
dao_create_service(service, user)
|
||||
assert Service.query.count() == 1
|
||||
service_db = Service.query.one()
|
||||
organization = Organization.query.get(organization.id)
|
||||
assert service_db.name == "service_name"
|
||||
assert service_db.id == service.id
|
||||
assert service_db.email_from == 'email_from'
|
||||
assert service_db.research_mode is False
|
||||
assert service_db.prefix_sms is True
|
||||
assert service.active is True
|
||||
assert user in service_db.users
|
||||
assert service_db.organization_type == 'state'
|
||||
assert service.organization_id == organization.id
|
||||
assert service.organization == organization
|
||||
|
||||
service = dao_fetch_service_by_id_with_api_keys(service.id, False)
|
||||
assert service is not None
|
||||
assert service.api_keys is not None
|
||||
service = dao_fetch_service_by_id_with_api_keys(service.id, True)
|
||||
assert service is not None
|
||||
|
||||
|
||||
def test_cannot_create_two_services_with_same_name(notify_db_session):
|
||||
user = create_user()
|
||||
assert Service.query.count() == 0
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import datetime
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from app.commands import (
|
||||
@@ -5,10 +8,171 @@ from app.commands import (
|
||||
create_test_user,
|
||||
insert_inbound_numbers_from_file,
|
||||
populate_annual_billing_with_defaults,
|
||||
populate_annual_billing_with_the_previous_years_allowance,
|
||||
populate_organization_agreement_details_from_file,
|
||||
populate_organizations_from_file,
|
||||
purge_functional_test_data,
|
||||
update_jobs_archived_flag,
|
||||
)
|
||||
from app.dao.inbound_numbers_dao import dao_get_available_inbound_numbers
|
||||
from app.models import AnnualBilling, Template, User
|
||||
from tests.app.db import create_annual_billing, create_service
|
||||
from app.models import AnnualBilling, Job, Organization, Service, Template, User
|
||||
from tests.app.db import (
|
||||
create_annual_billing,
|
||||
create_job,
|
||||
create_organization,
|
||||
create_service,
|
||||
create_template,
|
||||
)
|
||||
|
||||
|
||||
def test_purge_functional_test_data(notify_db_session, notify_api):
|
||||
print("ENTER test_purge_functional_test_data")
|
||||
|
||||
user_count = User.query.count()
|
||||
print(f"INITIAL user count {user_count}")
|
||||
# run the command
|
||||
notify_api.test_cli_runner().invoke(
|
||||
create_test_user, [
|
||||
'--email', 'somebody+7af2cdb0-7cbc-44dc-a5d0-f817fc6ee94e@fake.gov',
|
||||
'--mobile_number', '202-555-5555',
|
||||
'--password', 'correct horse battery staple',
|
||||
'--name', 'Fake Personson',
|
||||
# '--auth_type', 'sms_auth', # this is the default
|
||||
# '--state', 'active', # this is the default
|
||||
# '--admin', 'False', # this is the default
|
||||
]
|
||||
)
|
||||
|
||||
user_count = User.query.count()
|
||||
print(f"AFTER create_test_user {user_count}")
|
||||
assert user_count == 1
|
||||
|
||||
users = User.query.all()
|
||||
for user in users:
|
||||
print(f"USER: {user.email_address} {user.id} {user.name}")
|
||||
|
||||
# run the command
|
||||
print("INVOKING purge_functional_test_data")
|
||||
notify_api.test_cli_runner().invoke(purge_functional_test_data, ['-u', 'somebody'])
|
||||
print("IT WAS INVOKED")
|
||||
# there should be one more user
|
||||
assert User.query.count() == 0
|
||||
|
||||
|
||||
def test_purge_functional_test_data_bad_mobile(notify_db_session, notify_api):
|
||||
|
||||
user_count = User.query.count()
|
||||
assert user_count == 0
|
||||
# run the command
|
||||
x = notify_api.test_cli_runner().invoke(
|
||||
create_test_user, [
|
||||
'--email', 'somebody+7af2cdb0-7cbc-44dc-a5d0-f817fc6ee94e@fake.gov',
|
||||
'--mobile_number', '555-555-55554444',
|
||||
'--password', 'correct horse battery staple',
|
||||
'--name', 'Fake Personson',
|
||||
# '--auth_type', 'sms_auth', # this is the default
|
||||
# '--state', 'active', # this is the default
|
||||
# '--admin', 'False', # this is the default
|
||||
]
|
||||
)
|
||||
print(f"X = {x}")
|
||||
# The bad mobile phone number results in a bad parameter error, leading to a system exit 2 and no entry made in db
|
||||
assert "SystemExit(2)" in str(x)
|
||||
user_count = User.query.count()
|
||||
assert user_count == 0
|
||||
|
||||
|
||||
def test_update_jobs_archived_flag(notify_db_session, notify_api):
|
||||
print("ENTER test_update_jobs_archived_flag")
|
||||
|
||||
service_count = Service.query.count()
|
||||
assert service_count == 0
|
||||
|
||||
service = create_service()
|
||||
service_count = Service.query.count()
|
||||
assert service_count == 1
|
||||
|
||||
sms_template = create_template(service=service, template_type='sms')
|
||||
create_job(sms_template)
|
||||
|
||||
# run the command
|
||||
one_hour_past = datetime.datetime.utcnow()
|
||||
one_hour_future = datetime.datetime.utcnow() + datetime.timedelta(days=1)
|
||||
|
||||
one_hour_past = one_hour_past.strftime("%Y-%m-%d")
|
||||
one_hour_future = one_hour_future.strftime("%Y-%m-%d")
|
||||
print(f"PAST {one_hour_past} FUTURE = {one_hour_future}")
|
||||
|
||||
archived_jobs = Job.query.filter(Job.archived is True).count()
|
||||
assert archived_jobs == 0
|
||||
|
||||
x = notify_api.test_cli_runner().invoke(
|
||||
update_jobs_archived_flag, [
|
||||
'-e', one_hour_future,
|
||||
'-s', one_hour_past,
|
||||
]
|
||||
)
|
||||
print(f"X = {x}")
|
||||
jobs = Job.query.all()
|
||||
assert len(jobs) == 1
|
||||
for job in jobs:
|
||||
assert job.archived is True
|
||||
|
||||
|
||||
def test_populate_organizations_from_file(notify_db_session, notify_api):
|
||||
|
||||
org_count = Organization.query.count()
|
||||
assert org_count == 0
|
||||
|
||||
file_name = "./tests/app/orgs1.csv"
|
||||
text = "name|blah|blah|blah|||\n" \
|
||||
"foo|Federal|True|'foo.gov'|||\n"
|
||||
f = open(file_name, "a")
|
||||
f.write(text)
|
||||
f.close()
|
||||
x = notify_api.test_cli_runner().invoke(
|
||||
populate_organizations_from_file, [
|
||||
'-f', file_name
|
||||
]
|
||||
)
|
||||
|
||||
os.remove(file_name)
|
||||
print(f"X = {x}")
|
||||
|
||||
org_count = Organization.query.count()
|
||||
assert org_count == 1
|
||||
|
||||
|
||||
def test_populate_organization_agreement_details_from_file(notify_db_session, notify_api):
|
||||
file_name = "./tests/app/orgs.csv"
|
||||
|
||||
org_count = Organization.query.count()
|
||||
assert org_count == 0
|
||||
create_organization()
|
||||
org_count = Organization.query.count()
|
||||
assert org_count == 1
|
||||
|
||||
org = Organization.query.one()
|
||||
org.agreement_signed = True
|
||||
notify_db_session.commit()
|
||||
|
||||
text = "id,agreement_signed_version,agreement_signed_on_behalf_of_name,agreement_signed_at\n" \
|
||||
f"{org.id},1,bob,'2023-01-01 00:00:00'\n"
|
||||
f = open(file_name, "a")
|
||||
f.write(text)
|
||||
f.close()
|
||||
x = notify_api.test_cli_runner().invoke(
|
||||
populate_organization_agreement_details_from_file, [
|
||||
'-f', file_name
|
||||
]
|
||||
)
|
||||
print(f"X = {x}")
|
||||
|
||||
org_count = Organization.query.count()
|
||||
assert org_count == 1
|
||||
org = Organization.query.one()
|
||||
assert org.agreement_signed_on_behalf_of_name == 'bob'
|
||||
os.remove(file_name)
|
||||
|
||||
|
||||
def test_create_test_user_command(notify_db_session, notify_api):
|
||||
@@ -73,6 +237,39 @@ def test_populate_annual_billing_with_defaults(
|
||||
assert results[0].free_sms_fragment_limit == expected_allowance
|
||||
|
||||
|
||||
@pytest.mark.parametrize("organization_type, expected_allowance",
|
||||
[('federal', 40000),
|
||||
('state', 40000)])
|
||||
def test_populate_annual_billing_with_the_previous_years_allowance(
|
||||
notify_db_session, notify_api, organization_type, expected_allowance
|
||||
):
|
||||
service = create_service(service_name=organization_type, organization_type=organization_type)
|
||||
|
||||
notify_api.test_cli_runner().invoke(
|
||||
populate_annual_billing_with_defaults, ['-y', 2022]
|
||||
)
|
||||
|
||||
results = AnnualBilling.query.filter(
|
||||
AnnualBilling.financial_year_start == 2022,
|
||||
AnnualBilling.service_id == service.id
|
||||
).all()
|
||||
|
||||
assert len(results) == 1
|
||||
assert results[0].free_sms_fragment_limit == expected_allowance
|
||||
|
||||
notify_api.test_cli_runner().invoke(
|
||||
populate_annual_billing_with_the_previous_years_allowance, ['-y', 2023]
|
||||
)
|
||||
|
||||
results = AnnualBilling.query.filter(
|
||||
AnnualBilling.financial_year_start == 2023,
|
||||
AnnualBilling.service_id == service.id
|
||||
).all()
|
||||
|
||||
assert len(results) == 1
|
||||
assert results[0].free_sms_fragment_limit == expected_allowance
|
||||
|
||||
|
||||
def test_populate_annual_billing_with_defaults_sets_free_allowance_to_zero_if_previous_year_is_zero(
|
||||
notify_db_session, notify_api
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user