mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 17:31:14 -05:00
Merge branch 'master' into add-reply-to-notifications
This commit is contained in:
@@ -163,7 +163,7 @@ def test_get_all_templates_for_service(notify_db, notify_db_session, service_fac
|
||||
assert len(dao_get_all_templates_for_service(service_2.id)) == 2
|
||||
|
||||
|
||||
def test_get_all_templates_for_service_shows_newest_created_first(notify_db, notify_db_session, sample_service):
|
||||
def test_get_all_templates_for_service_is_alphabetised(notify_db, notify_db_session, sample_service):
|
||||
template_1 = create_sample_template(
|
||||
notify_db,
|
||||
notify_db_session,
|
||||
@@ -190,14 +190,14 @@ def test_get_all_templates_for_service_shows_newest_created_first(notify_db, not
|
||||
)
|
||||
|
||||
assert Template.query.count() == 3
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[0].name == 'Sample Template 3'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[0].name == 'Sample Template 1'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[1].name == 'Sample Template 2'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[2].name == 'Sample Template 1'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[2].name == 'Sample Template 3'
|
||||
|
||||
template_2.name = 'Sample Template 2 (updated)'
|
||||
template_2.name = 'AAAAA Sample Template 2'
|
||||
dao_update_template(template_2)
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[0].name == 'Sample Template 3'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[1].name == 'Sample Template 2 (updated)'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[0].name == 'AAAAA Sample Template 2'
|
||||
assert dao_get_all_templates_for_service(sample_service.id)[1].name == 'Sample Template 1'
|
||||
|
||||
|
||||
def test_get_all_returns_empty_list_if_no_templates(sample_service):
|
||||
|
||||
@@ -121,12 +121,13 @@ def create_service_with_defined_sms_sender(
|
||||
def create_template(
|
||||
service,
|
||||
template_type=SMS_TYPE,
|
||||
template_name=None,
|
||||
subject='Template subject',
|
||||
content='Dear Sir/Madam, Hello. Yours Truly, The Government.',
|
||||
template_id=None
|
||||
):
|
||||
data = {
|
||||
'name': '{} Template Name'.format(template_type),
|
||||
'name': template_name or '{} Template Name'.format(template_type),
|
||||
'template_type': template_type,
|
||||
'content': content,
|
||||
'service': service,
|
||||
|
||||
@@ -1,28 +1,20 @@
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from flask import json
|
||||
from freezegun import freeze_time
|
||||
|
||||
from tests import create_authorization_header
|
||||
from tests.app.db import create_inbound_sms, create_service, create_service_with_inbound_number
|
||||
|
||||
|
||||
def test_get_inbound_sms_with_no_params(client, sample_service):
|
||||
def test_post_to_get_inbound_sms_with_no_params(admin_request, sample_service):
|
||||
one = create_inbound_sms(sample_service)
|
||||
two = create_inbound_sms(sample_service)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
data = {}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
sms = json_resp['data']
|
||||
sms = admin_request.post(
|
||||
'inbound_sms.post_query_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data={}
|
||||
)['data']
|
||||
|
||||
assert len(sms) == 2
|
||||
assert {inbound['id'] for inbound in sms} == {str(one.id), str(two.id)}
|
||||
@@ -37,40 +29,34 @@ def test_get_inbound_sms_with_no_params(client, sample_service):
|
||||
}
|
||||
|
||||
|
||||
def test_get_inbound_sms_with_limit(client, sample_service):
|
||||
def test_post_to_get_inbound_sms_with_limit(admin_request, sample_service):
|
||||
with freeze_time('2017-01-01'):
|
||||
one = create_inbound_sms(sample_service)
|
||||
with freeze_time('2017-01-02'):
|
||||
two = create_inbound_sms(sample_service)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
data = {'limit': 1}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
sms = json_resp['data']
|
||||
sms = admin_request.post(
|
||||
'inbound_sms.post_query_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data=data
|
||||
)['data']
|
||||
|
||||
assert len(sms) == 1
|
||||
assert sms[0]['id'] == str(two.id)
|
||||
|
||||
|
||||
def test_get_inbound_sms_should_error_with_invalid_limit(client, sample_service):
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
def test_post_to_get_inbound_sms_should_error_with_invalid_limit(admin_request, sample_service):
|
||||
data = {'limit': 'limit'}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
error_resp = admin_request.post(
|
||||
'inbound_sms.post_query_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data=data,
|
||||
_expected_status=400
|
||||
)
|
||||
|
||||
error_resp = json.loads(response.get_data(as_text=True))
|
||||
assert error_resp['status_code'] == 400
|
||||
assert error_resp['errors'] == [{
|
||||
'error': 'ValidationError',
|
||||
@@ -78,73 +64,61 @@ def test_get_inbound_sms_should_error_with_invalid_limit(client, sample_service)
|
||||
}]
|
||||
|
||||
|
||||
def test_get_inbound_sms_should_error_with_invalid_phone_number(client, sample_service):
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
data = {'phone_number': 'invalid phone number'}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
|
||||
error_resp = json.loads(response.get_data(as_text=True))
|
||||
assert error_resp['status_code'] == 400
|
||||
assert error_resp['errors'] == [{
|
||||
'error': 'ValidationError',
|
||||
'message': "phone_number Must not contain letters or symbols"
|
||||
}]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('user_number', [
|
||||
'(07700) 900-001',
|
||||
'+4407700900001',
|
||||
'447700900001',
|
||||
])
|
||||
def test_get_inbound_sms_filters_user_number(client, sample_service, user_number):
|
||||
def test_post_to_get_inbound_sms_filters_user_number(admin_request, sample_service, user_number):
|
||||
# user_number in the db is international and normalised
|
||||
one = create_inbound_sms(sample_service, user_number='447700900001')
|
||||
two = create_inbound_sms(sample_service, user_number='447700900002')
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
data = {
|
||||
'limit': 1,
|
||||
'phone_number': user_number
|
||||
}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
sms = admin_request.post(
|
||||
'inbound_sms.post_query_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data=data
|
||||
)['data']
|
||||
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
sms = json_resp['data']
|
||||
assert len(sms) == 1
|
||||
assert sms[0]['id'] == str(one.id)
|
||||
assert sms[0]['user_number'] == str(one.user_number)
|
||||
|
||||
|
||||
def test_get_inbound_sms_filters_international_user_number(admin_request, sample_service):
|
||||
def test_post_to_get_inbound_sms_filters_international_user_number(admin_request, sample_service):
|
||||
# user_number in the db is international and normalised
|
||||
one = create_inbound_sms(sample_service, user_number='12025550104')
|
||||
two = create_inbound_sms(sample_service)
|
||||
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
data = {
|
||||
'limit': 1,
|
||||
'phone_number': '+1 (202) 555-0104'
|
||||
}
|
||||
|
||||
response = client.post(
|
||||
path='/service/{}/inbound-sms'.format(sample_service.id),
|
||||
data=json.dumps(data),
|
||||
headers=[('Content-Type', 'application/json'), auth_header])
|
||||
sms = admin_request.post(
|
||||
'inbound_sms.post_query_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data=data
|
||||
)['data']
|
||||
|
||||
json_resp = json.loads(response.get_data(as_text=True))
|
||||
sms = json_resp['data']
|
||||
assert len(sms) == 1
|
||||
assert sms[0]['id'] == str(one.id)
|
||||
assert sms[0]['user_number'] == str(one.user_number)
|
||||
|
||||
|
||||
def test_post_to_get_inbound_sms_allows_badly_formatted_number(admin_request, sample_service):
|
||||
one = create_inbound_sms(sample_service, user_number='ALPHANUM3R1C')
|
||||
|
||||
sms = admin_request.post(
|
||||
'inbound_sms.get_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
_data={'phone_number': 'ALPHANUM3R1C'}
|
||||
)['data']
|
||||
|
||||
assert len(sms) == 1
|
||||
assert sms[0]['id'] == str(one.id)
|
||||
@@ -156,7 +130,7 @@ def test_get_inbound_sms_filters_international_user_number(admin_request, sample
|
||||
##############################################################
|
||||
|
||||
|
||||
def test_get_inbound_sms(admin_request, sample_service):
|
||||
def test_old_get_inbound_sms(admin_request, sample_service):
|
||||
one = create_inbound_sms(sample_service)
|
||||
two = create_inbound_sms(sample_service)
|
||||
|
||||
@@ -180,7 +154,7 @@ def test_get_inbound_sms(admin_request, sample_service):
|
||||
}
|
||||
|
||||
|
||||
def test_get_inbound_sms_limits(admin_request, sample_service):
|
||||
def test_old_get_inbound_sms_limits(admin_request, sample_service):
|
||||
with freeze_time('2017-01-01'):
|
||||
one = create_inbound_sms(sample_service)
|
||||
with freeze_time('2017-01-02'):
|
||||
@@ -201,7 +175,7 @@ def test_get_inbound_sms_limits(admin_request, sample_service):
|
||||
'+4407700900001',
|
||||
'447700900001',
|
||||
])
|
||||
def test_get_inbound_sms_filters_user_number(admin_request, sample_service, user_number):
|
||||
def test_old_get_inbound_sms_filters_user_number(admin_request, sample_service, user_number):
|
||||
# user_number in the db is international and normalised
|
||||
one = create_inbound_sms(sample_service, user_number='447700900001')
|
||||
two = create_inbound_sms(sample_service, user_number='447700900002')
|
||||
@@ -217,7 +191,7 @@ def test_get_inbound_sms_filters_user_number(admin_request, sample_service, user
|
||||
assert sms['data'][0]['user_number'] == str(one.user_number)
|
||||
|
||||
|
||||
def test_get_inbound_sms_filters_international_user_number(admin_request, sample_service):
|
||||
def test_old_get_inbound_sms_filters_international_user_number(admin_request, sample_service):
|
||||
# user_number in the db is international and normalised
|
||||
one = create_inbound_sms(sample_service, user_number='12025550104')
|
||||
two = create_inbound_sms(sample_service)
|
||||
@@ -233,6 +207,20 @@ def test_get_inbound_sms_filters_international_user_number(admin_request, sample
|
||||
assert sms['data'][0]['user_number'] == str(one.user_number)
|
||||
|
||||
|
||||
def test_old_get_inbound_sms_allows_badly_formatted_number(admin_request, sample_service):
|
||||
one = create_inbound_sms(sample_service, user_number='ALPHANUM3R1C')
|
||||
|
||||
sms = admin_request.get(
|
||||
'inbound_sms.get_inbound_sms_for_service',
|
||||
service_id=sample_service.id,
|
||||
user_number='ALPHANUM3R1C',
|
||||
)
|
||||
|
||||
assert len(sms['data']) == 1
|
||||
assert sms['data'][0]['id'] == str(one.id)
|
||||
assert sms['data'][0]['user_number'] == str(one.user_number)
|
||||
|
||||
|
||||
##############################
|
||||
# End delete section
|
||||
##############################
|
||||
|
||||
@@ -413,3 +413,24 @@ def test_firetext_inbound_sms_auth(notify_db_session, notify_api, client, mocker
|
||||
with set_config(notify_api, 'FIRETEXT_INBOUND_SMS_AUTH', keys):
|
||||
response = firetext_post(client, data, auth=bool(auth), password=auth)
|
||||
assert response.status_code == status_code
|
||||
|
||||
|
||||
def test_create_inbound_sms_object_works_with_alphanumeric_sender(sample_service_full_permissions):
|
||||
data = {
|
||||
'Message': 'hello',
|
||||
'Number': sample_service_full_permissions.get_inbound_number(),
|
||||
'MSISDN': 'ALPHANUM3R1C',
|
||||
'DateRecieved': '2017-01-02+03%3A04%3A05',
|
||||
'ID': 'bar',
|
||||
}
|
||||
|
||||
inbound_sms = create_inbound_sms_object(
|
||||
service=sample_service_full_permissions,
|
||||
content=format_mmg_message(data["Message"]),
|
||||
from_number='ALPHANUM3R1C',
|
||||
provider_ref='foo',
|
||||
date_received=None,
|
||||
provider_name="mmg"
|
||||
)
|
||||
|
||||
assert inbound_sms.user_number == 'ALPHANUM3R1C'
|
||||
|
||||
@@ -301,10 +301,10 @@ def test_should_be_able_to_get_all_templates_for_a_service(client, sample_user,
|
||||
|
||||
assert response.status_code == 200
|
||||
update_json_resp = json.loads(response.get_data(as_text=True))
|
||||
assert update_json_resp['data'][0]['name'] == 'my template 2'
|
||||
assert update_json_resp['data'][0]['name'] == 'my template 1'
|
||||
assert update_json_resp['data'][0]['version'] == 1
|
||||
assert update_json_resp['data'][0]['created_at']
|
||||
assert update_json_resp['data'][1]['name'] == 'my template 1'
|
||||
assert update_json_resp['data'][1]['name'] == 'my template 2'
|
||||
assert update_json_resp['data'][1]['version'] == 1
|
||||
assert update_json_resp['data'][1]['created_at']
|
||||
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
from datetime import datetime
|
||||
|
||||
from app.commands import BackfillProcessingTime
|
||||
from app.commands import backfill_processing_time
|
||||
|
||||
|
||||
def test_backfill_processing_time_works_for_correct_dates(mocker):
|
||||
def test_backfill_processing_time_works_for_correct_dates(mocker, notify_api):
|
||||
send_mock = mocker.patch('app.commands.send_processing_time_for_start_and_end')
|
||||
|
||||
BackfillProcessingTime().run('2017-08-01', '2017-08-03')
|
||||
# backfill_processing_time is a click.Command object - if you try invoking the callback on its own, it
|
||||
# throws a `RuntimeError: There is no active click context.` - so get at the original function using __wrapped__
|
||||
backfill_processing_time.callback.__wrapped__('2017-08-01', '2017-08-03')
|
||||
|
||||
assert send_mock.call_count == 3
|
||||
send_mock.assert_any_call(datetime(2017, 7, 31, 23, 0), datetime(2017, 8, 1, 23, 0))
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from flask import json
|
||||
from itertools import product
|
||||
|
||||
from app.models import TEMPLATE_TYPES, EMAIL_TYPE
|
||||
from tests import create_authorization_header
|
||||
@@ -8,12 +9,15 @@ from tests.app.db import create_template
|
||||
|
||||
|
||||
def test_get_all_templates_returns_200(client, sample_service):
|
||||
num_templates = 3
|
||||
templates = []
|
||||
for i in range(num_templates):
|
||||
for tmp_type in TEMPLATE_TYPES:
|
||||
subject = 'subject_{}'.format(i) if tmp_type == EMAIL_TYPE else ''
|
||||
templates.append(create_template(sample_service, template_type=tmp_type, subject=subject))
|
||||
templates = [
|
||||
create_template(
|
||||
sample_service,
|
||||
template_type=tmp_type,
|
||||
subject='subject_{}'.format(name) if tmp_type == EMAIL_TYPE else '',
|
||||
template_name=name,
|
||||
)
|
||||
for name, tmp_type in product(('A', 'B', 'C'), TEMPLATE_TYPES)
|
||||
]
|
||||
|
||||
auth_header = create_authorization_header(service_id=sample_service.id)
|
||||
|
||||
@@ -25,25 +29,27 @@ def test_get_all_templates_returns_200(client, sample_service):
|
||||
|
||||
json_response = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert len(json_response['templates']) == num_templates * len(TEMPLATE_TYPES)
|
||||
assert len(json_response['templates']) == len(templates)
|
||||
|
||||
# need to reverse index as get all templates returns list sorted by descending date
|
||||
for i in range(len(json_response['templates'])):
|
||||
reverse_index = len(json_response['templates']) - 1 - i
|
||||
assert json_response['templates'][reverse_index]['id'] == str(templates[i].id)
|
||||
assert json_response['templates'][reverse_index]['body'] == templates[i].content
|
||||
assert json_response['templates'][reverse_index]['type'] == templates[i].template_type
|
||||
if templates[i].template_type == EMAIL_TYPE:
|
||||
assert json_response['templates'][reverse_index]['subject'] == templates[i].subject
|
||||
for index, template in enumerate(json_response['templates']):
|
||||
assert template['id'] == str(templates[index].id)
|
||||
assert template['body'] == templates[index].content
|
||||
assert template['type'] == templates[index].template_type
|
||||
if templates[index].template_type == EMAIL_TYPE:
|
||||
assert template['subject'] == templates[index].subject
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tmp_type", TEMPLATE_TYPES)
|
||||
def test_get_all_templates_for_valid_type_returns_200(client, sample_service, tmp_type):
|
||||
num_templates = 3
|
||||
templates = []
|
||||
for i in range(num_templates):
|
||||
subject = 'subject_{}'.format(i) if tmp_type == EMAIL_TYPE else ''
|
||||
templates.append(create_template(sample_service, template_type=tmp_type, subject=subject))
|
||||
templates = [
|
||||
create_template(
|
||||
sample_service,
|
||||
template_type=tmp_type,
|
||||
template_name='Template {}'.format(i),
|
||||
subject='subject_{}'.format(i) if tmp_type == EMAIL_TYPE else ''
|
||||
)
|
||||
for i in range(3)
|
||||
]
|
||||
|
||||
auth_header = create_authorization_header(service_id=sample_service.id)
|
||||
|
||||
@@ -55,16 +61,14 @@ def test_get_all_templates_for_valid_type_returns_200(client, sample_service, tm
|
||||
|
||||
json_response = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert len(json_response['templates']) == num_templates
|
||||
assert len(json_response['templates']) == len(templates)
|
||||
|
||||
# need to reverse index as get all templates returns list sorted by descending date
|
||||
for i in range(len(json_response['templates'])):
|
||||
reverse_index = len(json_response['templates']) - 1 - i
|
||||
assert json_response['templates'][reverse_index]['id'] == str(templates[i].id)
|
||||
assert json_response['templates'][reverse_index]['body'] == templates[i].content
|
||||
assert json_response['templates'][reverse_index]['type'] == tmp_type
|
||||
if templates[i].template_type == EMAIL_TYPE:
|
||||
assert json_response['templates'][reverse_index]['subject'] == templates[i].subject
|
||||
for index, template in enumerate(json_response['templates']):
|
||||
assert template['id'] == str(templates[index].id)
|
||||
assert template['body'] == templates[index].content
|
||||
assert template['type'] == tmp_type
|
||||
if templates[index].template_type == EMAIL_TYPE:
|
||||
assert template['subject'] == templates[index].subject
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tmp_type", TEMPLATE_TYPES)
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from contextlib import contextmanager
|
||||
import os
|
||||
|
||||
from flask import Flask
|
||||
from alembic.command import upgrade
|
||||
from alembic.config import Config
|
||||
from flask_migrate import Migrate, MigrateCommand
|
||||
from flask_script import Manager
|
||||
import boto3
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
@@ -14,7 +13,8 @@ from app import create_app, db
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def notify_api():
|
||||
app = create_app()
|
||||
app = Flask('test')
|
||||
create_app(app)
|
||||
|
||||
# deattach server-error error handlers - error_handler_spec looks like:
|
||||
# {'blueprint_name': {
|
||||
@@ -76,8 +76,6 @@ def notify_db(notify_api, worker_id):
|
||||
current_app.config['SQLALCHEMY_DATABASE_URI'] += '_{}'.format(worker_id)
|
||||
create_test_db(current_app.config['SQLALCHEMY_DATABASE_URI'])
|
||||
|
||||
Migrate(notify_api, db)
|
||||
Manager(db, MigrateCommand)
|
||||
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
|
||||
ALEMBIC_CONFIG = os.path.join(BASE_DIR, 'migrations')
|
||||
config = Config(ALEMBIC_CONFIG + '/alembic.ini')
|
||||
|
||||
Reference in New Issue
Block a user