Update the service name validation in the ServiceNameForm and AddServiceForm to check the email_safe version

of the name against a list of all service email_from fields.
Update find_all_service_names to find_all_service_email_from, which returns the email_from of all services.
This commit is contained in:
Rebecca Law
2016-03-31 15:17:05 +01:00
parent 1871243cc8
commit 9a2cb60f5e
9 changed files with 53 additions and 31 deletions

View File

@@ -1,13 +1,14 @@
import re
from flask_wtf import Form
from utils.recipients import (
validate_phone_number,
InvalidPhoneError
)
from wtforms import (
StringField,
PasswordField,
ValidationError,
TextAreaField,
FileField,
RadioField,
BooleanField,
HiddenField
)
@@ -16,12 +17,6 @@ from wtforms.validators import (DataRequired, Email, Length, Regexp)
from app.main.validators import (Blacklist, CsvFileValidator, ValidEmailDomainRegex)
from utils.recipients import (
validate_phone_number,
format_phone_number,
InvalidPhoneError
)
def email_address(label='Email address'):
return EmailField(label, validators=[
@@ -159,7 +154,9 @@ class AddServiceForm(Form):
)
def validate_name(self, a):
if a.data.lower() in self._names_func():
from app.utils import email_safe
# make sure the email_from will be unique to all services
if email_safe(a.data) in self._names_func():
raise ValidationError('This service name is already in use')
@@ -180,7 +177,9 @@ class ServiceNameForm(Form):
])
def validate_name(self, a):
if a.data.lower() in self._names_func():
from app.utils import email_safe
# make sure the email_from will be unique to all services
if email_safe(a.data) in self._names_func():
raise ValidationError('This service name is already in use')

View File

@@ -16,6 +16,7 @@ from app import (
user_api_client,
service_api_client
)
from app.utils import email_safe
@main.route("/add-service", methods=['GET', 'POST'])
@@ -31,12 +32,17 @@ def add_service():
invite_api_client.accept_invite(service_id, invitation.id)
return redirect(url_for('main.service_dashboard', service_id=service_id))
form = AddServiceForm(service_api_client.find_all_service_names_lower)
form = AddServiceForm(service_api_client.find_all_service_email_from)
heading = 'Which service do you want to set up notifications for?'
if form.validate_on_submit():
session['service_name'] = form.name.data
service_id = service_api_client.create_service(
session['service_name'], False, current_app.config['DEFAULT_SERVICE_LIMIT'], True, session['user_id'])
email_from = email_safe(session['service_name'])
service_id = service_api_client.create_service(service_name=session['service_name'],
active=False,
limit=current_app.config['DEFAULT_SERVICE_LIMIT'],
restricted=True,
user_id=session['user_id'],
email_from=email_from)
return redirect(url_for('main.service_dashboard', service_id=service_id))
else:

View File

@@ -39,7 +39,7 @@ def service_settings(service_id):
def service_name_change(service_id):
service = service_api_client.get_service(service_id)['data']
form = ServiceNameForm(service_api_client.find_all_service_names_lower)
form = ServiceNameForm(service_api_client.find_all_service_email_from)
if form.validate_on_submit():
session['service_name_change'] = form.name.data

View File

@@ -18,7 +18,7 @@ class ServiceAPIClient(NotificationsAPIClient):
self.client_id = application.config['ADMIN_CLIENT_USER_NAME']
self.secret = application.config['ADMIN_CLIENT_SECRET']
def create_service(self, service_name, active, limit, restricted, user_id):
def create_service(self, service_name, active, limit, restricted, user_id, email_from):
"""
Create a service and return the json.
"""
@@ -27,7 +27,8 @@ class ServiceAPIClient(NotificationsAPIClient):
"active": active,
"limit": limit,
"user_id": user_id,
"restricted": restricted
"restricted": restricted,
"email_from": email_from
}
return self.post("/service", data)['data']['id']
@@ -142,9 +143,9 @@ class ServiceAPIClient(NotificationsAPIClient):
endpoint = "/service/{0}/template/{1}".format(service_id, template_id)
return self.delete(endpoint)
def find_all_service_names_lower(self, user_id=None):
def find_all_service_email_from(self, user_id=None):
resp = self.get_services(user_id)
return [x['name'].lower() for x in resp['data']]
return [x['email_from'] for x in resp['data']]
class ServicesBrowsableItem(BrowsableItem):

View File

@@ -4,7 +4,7 @@ from werkzeug.datastructures import MultiDict
def test_form_should_have_errors_when_duplicate_service_is_added(app_):
def _get_form_names():
return ['some service', 'more names']
return ['some.service', 'more.names']
with app_.test_request_context():
form = AddServiceForm(_get_form_names,
formdata=MultiDict([('name', 'some service')]))

View File

@@ -1,5 +1,7 @@
from flask import url_for
import app
def test_get_should_render_add_service_template(app_,
api_user_active,
@@ -26,9 +28,12 @@ def test_should_add_service_and_redirect_to_next_page(app_,
assert response.status_code == 302
assert response.location == url_for('main.service_dashboard', service_id=101, _external=True)
assert mock_get_services.called
mock_create_service.asset_called_once_with('testing the post', False,
app_.config['DEFAULT_SERVICE_LIMIT'],
True, api_user_active.id)
mock_create_service.asset_called_once_with(service_name='testing the post',
active=False,
limit=app_.config['DEFAULT_SERVICE_LIMIT'],
restricted=True,
user_id=api_user_active.id,
email_from='testing.the.post')
def test_should_return_form_errors_when_service_name_is_empty(app_,
@@ -45,15 +50,16 @@ def test_should_return_form_errors_when_service_name_is_empty(app_,
def test_should_return_form_errors_with_duplicate_service_name_regardless_of_case(app_,
mocker,
service_one,
mock_get_services,
api_user_active,
mock_create_service):
with app_.test_request_context():
with app_.test_client() as client:
client.login(api_user_active, mocker, service_one)
response = client.post(url_for('main.add_service'), data={'name': 'SERVICE_TWO'})
mocker.patch('app.service_api_client.find_all_service_email_from',
return_value=['service_one', 'service.two'])
response = client.post(url_for('main.add_service'), data={'name': 'SERVICE TWO'})
assert response.status_code == 200
assert 'This service name is already in use' in response.get_data(as_text=True)
assert mock_get_services.called
app.service_api_client.find_all_service_email_from.assert_called_once_with()
assert not mock_create_service.called

View File

@@ -58,19 +58,21 @@ def test_should_redirect_after_change_service_name(app_,
def test_should_not_allow_duplicate_names(app_,
active_user_with_permissions,
mocker,
service_one,
mock_get_services):
service_one):
with app_.test_request_context():
with app_.test_client() as client:
client.login(active_user_with_permissions, mocker, service_one)
mocker.patch('app.service_api_client.find_all_service_email_from',
return_value=['service_one', 'service.two'])
service_id = service_one['id']
response = client.post(
url_for('main.service_name_change', service_id=service_id),
data={'name': "SErvICE_TWO"})
data={'name': "SErvICE TWO"})
assert response.status_code == 200
resp_data = response.get_data(as_text=True)
assert 'This service name is already in use' in resp_data
app.service_api_client.find_all_service_email_from.assert_called_once_with()
def test_should_show_service_name_confirmation(app_,

8
tests/app/test_utils.py Normal file
View File

@@ -0,0 +1,8 @@
from app.utils import email_safe
def test_email_safe_return_dot_separated_email_domain():
test_name = 'SOME service with+stuff+ b123'
expected = 'some.service.withstuff.b123'
actual = email_safe(test_name)
assert actual == expected

View File

@@ -60,10 +60,10 @@ def mock_get_service(mocker, api_user_active):
@pytest.fixture(scope='function')
def mock_create_service(mocker):
def _create(service_name, active, limit, restricted, user_id):
def _create(service_name, active, limit, restricted, user_id, email_from):
service = service_json(
101, service_name, [user_id], limit=limit,
active=active, restricted=restricted)
active=active, restricted=restricted, email_from=email_from)
return service['id']
return mocker.patch(