Files
notifications-api/app/notifications/validators.py
2021-05-25 08:29:25 +01:00

274 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import current_app
from gds_metrics.metrics import Histogram
from notifications_utils import SMS_CHAR_COUNT_LIMIT
from notifications_utils.clients.redis import (
daily_limit_cache_key,
rate_limit_cache_key,
)
from notifications_utils.postal_address import PostalAddress
from notifications_utils.recipients import (
get_international_phone_info,
validate_and_format_email_address,
validate_and_format_phone_number,
)
from sqlalchemy.orm.exc import NoResultFound
from app import redis_store
from app.dao import services_dao
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_letter_contact_dao import dao_get_letter_contact_by_id
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
from app.models import (
EMAIL_TYPE,
INTERNATIONAL_LETTERS,
INTERNATIONAL_SMS_TYPE,
KEY_TYPE_TEAM,
KEY_TYPE_TEST,
LETTER_TYPE,
SMS_TYPE,
ServicePermission,
)
from app.notifications.process_notifications import (
create_content_for_notification,
)
from app.serialised_models import SerialisedTemplate
from app.service.utils import service_allowed_to_send_to
from app.utils import get_public_notify_type_text
from app.v2.errors import (
BadRequestError,
RateLimitError,
TooManyRequestsError,
ValidationError,
)
REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS = Histogram(
'redis_exceeded_rate_limit_duration_seconds',
'Time taken to check rate limit',
)
def check_service_over_api_rate_limit(service, api_key):
if current_app.config['API_RATE_LIMIT_ENABLED'] and current_app.config['REDIS_ENABLED']:
cache_key = rate_limit_cache_key(service.id, api_key.key_type)
rate_limit = service.rate_limit
interval = 60
with REDIS_EXCEEDED_RATE_LIMIT_DURATION_SECONDS.time():
if redis_store.exceeded_rate_limit(cache_key, rate_limit, interval):
current_app.logger.info("service {} has been rate limited for throughput".format(service.id))
raise RateLimitError(rate_limit, interval, api_key.key_type)
def check_service_over_daily_message_limit(key_type, service):
if key_type != KEY_TYPE_TEST and current_app.config['REDIS_ENABLED']:
cache_key = daily_limit_cache_key(service.id)
service_stats = redis_store.get(cache_key)
if not service_stats:
service_stats = services_dao.fetch_todays_total_message_count(service.id)
redis_store.set(cache_key, service_stats, ex=3600)
if int(service_stats) >= service.message_limit:
current_app.logger.info(
"service {} has been rate limited for daily use sent {} limit {}".format(
service.id, int(service_stats), service.message_limit)
)
raise TooManyRequestsError(service.message_limit)
def check_rate_limiting(service, api_key):
check_service_over_api_rate_limit(service, api_key)
# Reduce queries to the notifications table
# check_service_over_daily_message_limit(api_key.key_type, service)
def check_template_is_for_notification_type(notification_type, template_type):
if notification_type != template_type:
message = "{0} template is not suitable for {1} notification".format(template_type,
notification_type)
raise BadRequestError(fields=[{'template': message}], message=message)
def check_template_is_active(template):
if template.archived:
raise BadRequestError(fields=[{'template': 'Template has been deleted'}],
message="Template has been deleted")
def service_can_send_to_recipient(send_to, key_type, service, allow_guest_list_recipients=True):
if not service_allowed_to_send_to(send_to, service, key_type, allow_guest_list_recipients):
if key_type == KEY_TYPE_TEAM:
message = 'Cant send to this recipient using a team-only API key'
else:
message = (
'Cant send to this recipient when service is in trial mode '
' see https://www.notifications.service.gov.uk/trial-mode'
)
raise BadRequestError(message=message)
def service_has_permission(notify_type, permissions):
return notify_type in permissions
def check_service_has_permission(notify_type, permissions):
if not service_has_permission(notify_type, permissions):
raise BadRequestError(message="Service is not allowed to send {}".format(
get_public_notify_type_text(notify_type, plural=True)
))
def check_if_service_can_send_files_by_email(service_contact_link, service_id):
if not service_contact_link:
raise BadRequestError(
message=f"Send files by email has not been set up - add contact details for your service at "
f"{current_app.config['ADMIN_BASE_URL']}/services/{service_id}/service-settings/send-files-by-email"
)
def validate_and_format_recipient(send_to, key_type, service, notification_type, allow_guest_list_recipients=True):
if send_to is None:
raise BadRequestError(message="Recipient can't be empty")
service_can_send_to_recipient(send_to, key_type, service, allow_guest_list_recipients)
if notification_type == SMS_TYPE:
international_phone_info = check_if_service_can_send_to_number(service, send_to)
return validate_and_format_phone_number(
number=send_to,
international=international_phone_info.international
)
elif notification_type == EMAIL_TYPE:
return validate_and_format_email_address(email_address=send_to)
def check_if_service_can_send_to_number(service, number):
international_phone_info = get_international_phone_info(number)
if service.permissions and isinstance(service.permissions[0], ServicePermission):
permissions = [p.permission for p in service.permissions]
else:
permissions = service.permissions
if (
# if number is international and not a crown dependency
international_phone_info.international and not international_phone_info.crown_dependency
) and INTERNATIONAL_SMS_TYPE not in permissions:
raise BadRequestError(message="Cannot send to international mobile numbers")
else:
return international_phone_info
def check_is_message_too_long(template_with_content):
if template_with_content.is_message_too_long():
message = "Your message is too long. "
if template_with_content.template_type == SMS_TYPE:
message += (
f"Text messages cannot be longer than {SMS_CHAR_COUNT_LIMIT} characters. "
f"Your message is {template_with_content.content_count_without_prefix} characters long."
)
elif template_with_content.template_type == EMAIL_TYPE:
message += (
f"Emails cannot be longer than 2000000 bytes. "
f"Your message is {template_with_content.content_size_in_bytes} bytes."
)
raise BadRequestError(message=message)
def check_notification_content_is_not_empty(template_with_content):
if template_with_content.is_message_empty():
message = 'Your message is empty.'
raise BadRequestError(message=message)
def validate_template(template_id, personalisation, service, notification_type, check_char_count=True):
try:
template = SerialisedTemplate.from_id_and_service_id(template_id, service.id)
except NoResultFound:
message = 'Template not found'
raise BadRequestError(message=message,
fields=[{'template': message}])
check_template_is_for_notification_type(notification_type, template.template_type)
check_template_is_active(template)
template_with_content = create_content_for_notification(template, personalisation)
check_notification_content_is_not_empty(template_with_content)
# validating the template in post_notifications happens before the file is uploaded for doc download,
# which means the length of the message can be exceeded because it's including the file.
# The document download feature is only available through the api.
if check_char_count:
check_is_message_too_long(template_with_content)
return template, template_with_content
def check_reply_to(service_id, reply_to_id, type_):
if type_ == EMAIL_TYPE:
return check_service_email_reply_to_id(service_id, reply_to_id, type_)
elif type_ == SMS_TYPE:
return check_service_sms_sender_id(service_id, reply_to_id, type_)
elif type_ == LETTER_TYPE:
return check_service_letter_contact_id(service_id, reply_to_id, type_)
def check_service_email_reply_to_id(service_id, reply_to_id, notification_type):
if reply_to_id:
try:
return dao_get_reply_to_by_id(service_id, reply_to_id).email_address
except NoResultFound:
message = 'email_reply_to_id {} does not exist in database for service id {}' \
.format(reply_to_id, service_id)
raise BadRequestError(message=message)
def check_service_sms_sender_id(service_id, sms_sender_id, notification_type):
if sms_sender_id:
try:
return dao_get_service_sms_senders_by_id(service_id, sms_sender_id).sms_sender
except NoResultFound:
message = 'sms_sender_id {} does not exist in database for service id {}' \
.format(sms_sender_id, service_id)
raise BadRequestError(message=message)
def check_service_letter_contact_id(service_id, letter_contact_id, notification_type):
if letter_contact_id:
try:
return dao_get_letter_contact_by_id(service_id, letter_contact_id).contact_block
except NoResultFound:
message = 'letter_contact_id {} does not exist in database for service id {}' \
.format(letter_contact_id, service_id)
raise BadRequestError(message=message)
def validate_address(service, letter_data):
address = PostalAddress.from_personalisation(
letter_data,
allow_international_letters=(INTERNATIONAL_LETTERS in str(service.permissions)),
)
if not address.has_enough_lines:
raise ValidationError(
message=f'Address must be at least {PostalAddress.MIN_LINES} lines'
)
if address.has_too_many_lines:
raise ValidationError(
message=f'Address must be no more than {PostalAddress.MAX_LINES} lines'
)
if not address.has_valid_last_line:
if address.allow_international_letters:
raise ValidationError(
message='Last line of address must be a real UK postcode or another country'
)
raise ValidationError(
message='Must be a real UK postcode'
)
if address.has_invalid_characters:
raise ValidationError(
message='Address lines must not start with any of the following characters: @ ( ) = [ ] ” \\ / , < > ~'
)
if address.international:
return address.postage
else:
return None