mirror of
https://github.com/GSA/notifications-admin.git
synced 2025-12-09 06:33:52 -05:00
remove easy targets
This commit is contained in:
@@ -30,7 +30,7 @@ from werkzeug.local import LocalProxy
|
||||
from app import proxy_fix
|
||||
from app.asset_fingerprinter import asset_fingerprinter
|
||||
from app.config import configs
|
||||
from app.extensions import redis_client, zendesk_client
|
||||
from app.extensions import redis_client
|
||||
from app.formatters import (
|
||||
convert_markdown_template,
|
||||
convert_to_boolean,
|
||||
@@ -202,7 +202,6 @@ def create_app(application):
|
||||
user_api_client,
|
||||
# External API clients
|
||||
redis_client,
|
||||
zendesk_client,
|
||||
):
|
||||
client.init_app(application)
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from notifications_utils.clients.redis.redis_client import RedisClient
|
||||
from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient
|
||||
|
||||
zendesk_client = ZendeskClient()
|
||||
redis_client = RedisClient()
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
import requests
|
||||
from flask import current_app
|
||||
|
||||
|
||||
class AntivirusError(Exception):
|
||||
def __init__(self, message=None, status_code=None):
|
||||
self.message = message
|
||||
self.status_code = status_code
|
||||
|
||||
@classmethod
|
||||
def from_exception(cls, e):
|
||||
try:
|
||||
message = e.response.json()["error"]
|
||||
status_code = e.response.status_code
|
||||
except (TypeError, ValueError, AttributeError, KeyError):
|
||||
message = "connection error"
|
||||
status_code = 503
|
||||
|
||||
return cls(message, status_code)
|
||||
|
||||
|
||||
class AntivirusClient:
|
||||
def __init__(self, api_host=None, auth_token=None):
|
||||
self.api_host = api_host
|
||||
self.auth_token = auth_token
|
||||
|
||||
def init_app(self, app):
|
||||
self.api_host = app.config["ANTIVIRUS_API_HOST"]
|
||||
self.auth_token = app.config["ANTIVIRUS_API_KEY"]
|
||||
|
||||
def scan(self, document_stream):
|
||||
try:
|
||||
response = requests.post(
|
||||
"{}/scan".format(self.api_host),
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(self.auth_token),
|
||||
},
|
||||
files={"document": document_stream},
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
except requests.RequestException as e:
|
||||
error = AntivirusError.from_exception(e)
|
||||
current_app.logger.warning(
|
||||
"Notify Antivirus API request failed with error: {}".format(
|
||||
error.message
|
||||
)
|
||||
)
|
||||
|
||||
raise error
|
||||
finally:
|
||||
document_stream.seek(0)
|
||||
|
||||
return response.json()["ok"]
|
||||
@@ -1,86 +0,0 @@
|
||||
from base64 import urlsafe_b64encode
|
||||
from json import dumps, loads
|
||||
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from itsdangerous import BadSignature, URLSafeSerializer
|
||||
|
||||
|
||||
class EncryptionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SaltLengthError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Encryption:
|
||||
def init_app(self, app):
|
||||
self._serializer = URLSafeSerializer(app.config.get("SECRET_KEY"))
|
||||
self._salt = app.config.get("DANGEROUS_SALT")
|
||||
self._password = app.config.get("SECRET_KEY").encode()
|
||||
|
||||
try:
|
||||
self._shared_encryptor = Fernet(self._derive_key(self._salt))
|
||||
except SaltLengthError as reason:
|
||||
raise EncryptionError(
|
||||
"DANGEROUS_SALT must be at least 16 bytes"
|
||||
) from reason
|
||||
|
||||
def encrypt(self, thing_to_encrypt, salt=None):
|
||||
"""Encrypt a string or object
|
||||
|
||||
thing_to_encrypt must be serializable as JSON
|
||||
Returns a UTF-8 string
|
||||
"""
|
||||
serialized_bytes = dumps(thing_to_encrypt).encode("utf-8")
|
||||
encrypted_bytes = self._encryptor(salt).encrypt(serialized_bytes)
|
||||
return encrypted_bytes.decode("utf-8")
|
||||
|
||||
def decrypt(self, thing_to_decrypt, salt=None):
|
||||
"""Decrypt a UTF-8 string or bytes.
|
||||
|
||||
Once decrypted, thing_to_decrypt must be deserializable from JSON.
|
||||
"""
|
||||
try:
|
||||
return loads(self._encryptor(salt).decrypt(thing_to_decrypt))
|
||||
except InvalidToken as reason:
|
||||
raise EncryptionError from reason
|
||||
|
||||
def sign(self, thing_to_sign, salt=None):
|
||||
return self._serializer.dumps(thing_to_sign, salt=(salt or self._salt))
|
||||
|
||||
def verify_signature(self, thing_to_verify, salt=None):
|
||||
try:
|
||||
return self._serializer.loads(thing_to_verify, salt=(salt or self._salt))
|
||||
except BadSignature as reason:
|
||||
raise EncryptionError from reason
|
||||
|
||||
def _encryptor(self, salt=None):
|
||||
if salt is None:
|
||||
return self._shared_encryptor
|
||||
else:
|
||||
try:
|
||||
return Fernet(self._derive_key(salt))
|
||||
except SaltLengthError as reason:
|
||||
raise EncryptionError(
|
||||
"Custom salt value must be at least 16 bytes"
|
||||
) from reason
|
||||
|
||||
def _derive_key(self, salt):
|
||||
"""Derive a key suitable for use within Fernet from the SECRET_KEY and salt
|
||||
|
||||
* For the salt to be secure, it must be 16 bytes or longer and randomly generated.
|
||||
* 600_000 was chosen for the iterations because it is what OWASP recommends as
|
||||
* of [February 2023](https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html#pbkdf2)
|
||||
* For more information, see https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#pbkdf2
|
||||
* and https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet
|
||||
"""
|
||||
salt_bytes = salt.encode()
|
||||
if len(salt_bytes) < 16:
|
||||
raise SaltLengthError
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(), length=32, salt=salt_bytes, iterations=600_000
|
||||
)
|
||||
return urlsafe_b64encode(kdf.derive(self._password))
|
||||
@@ -1,150 +0,0 @@
|
||||
import requests
|
||||
from flask import current_app
|
||||
|
||||
|
||||
class ZendeskError(Exception):
|
||||
def __init__(self, response):
|
||||
self.response = response
|
||||
|
||||
|
||||
class ZendeskClient:
|
||||
# the account used to authenticate with. If no requester is provided, the ticket will come from this account.
|
||||
NOTIFY_ZENDESK_EMAIL = "zd-api-notify@digital.cabinet-office.gov.uk"
|
||||
|
||||
ZENDESK_TICKET_URL = "https://govuk.zendesk.com/api/v2/tickets.json"
|
||||
|
||||
def __init__(self):
|
||||
self.api_key = None
|
||||
|
||||
def init_app(self, app, *args, **kwargs):
|
||||
self.api_key = app.config.get("ZENDESK_API_KEY")
|
||||
|
||||
def send_ticket_to_zendesk(self, ticket):
|
||||
response = requests.post(
|
||||
self.ZENDESK_TICKET_URL,
|
||||
json=ticket.request_data,
|
||||
auth=(f"{self.NOTIFY_ZENDESK_EMAIL}/token", self.api_key),
|
||||
)
|
||||
|
||||
if response.status_code != 201:
|
||||
current_app.logger.error(
|
||||
f"Zendesk create ticket request failed with {response.status_code} '{response.json()}'"
|
||||
)
|
||||
raise ZendeskError(response)
|
||||
|
||||
ticket_id = response.json()["ticket"]["id"]
|
||||
|
||||
current_app.logger.info(f"Zendesk create ticket {ticket_id} succeeded")
|
||||
|
||||
|
||||
class NotifySupportTicket:
|
||||
PRIORITY_URGENT = "urgent"
|
||||
PRIORITY_HIGH = "high"
|
||||
PRIORITY_NORMAL = "normal"
|
||||
PRIORITY_LOW = "low"
|
||||
|
||||
TAGS_P2 = "govuk_notify_support"
|
||||
TAGS_P1 = "govuk_notify_emergency"
|
||||
|
||||
TYPE_PROBLEM = "problem"
|
||||
TYPE_INCIDENT = "incident"
|
||||
TYPE_QUESTION = "question"
|
||||
TYPE_TASK = "task"
|
||||
|
||||
# Group: 3rd Line--Notify Support
|
||||
NOTIFY_GROUP_ID = 360000036529
|
||||
# Organization: GDS
|
||||
NOTIFY_ORG_ID = 21891972
|
||||
NOTIFY_TICKET_FORM_ID = 1900000284794
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
subject,
|
||||
message,
|
||||
ticket_type,
|
||||
p1=False,
|
||||
user_name=None,
|
||||
user_email=None,
|
||||
requester_sees_message_content=True,
|
||||
technical_ticket=False,
|
||||
ticket_categories=None,
|
||||
org_id=None,
|
||||
org_type=None,
|
||||
service_id=None,
|
||||
email_ccs=None,
|
||||
):
|
||||
self.subject = subject
|
||||
self.message = message
|
||||
self.ticket_type = ticket_type
|
||||
self.p1 = p1
|
||||
self.user_name = user_name
|
||||
self.user_email = user_email
|
||||
self.requester_sees_message_content = requester_sees_message_content
|
||||
self.technical_ticket = technical_ticket
|
||||
self.ticket_categories = ticket_categories or []
|
||||
self.org_id = org_id
|
||||
self.org_type = org_type
|
||||
self.service_id = service_id
|
||||
self.email_ccs = email_ccs
|
||||
|
||||
@property
|
||||
def request_data(self):
|
||||
data = {
|
||||
"ticket": {
|
||||
"subject": self.subject,
|
||||
"comment": {
|
||||
"body": self.message,
|
||||
"public": self.requester_sees_message_content,
|
||||
},
|
||||
"group_id": self.NOTIFY_GROUP_ID,
|
||||
"organization_id": self.NOTIFY_ORG_ID,
|
||||
"ticket_form_id": self.NOTIFY_TICKET_FORM_ID,
|
||||
"priority": self.PRIORITY_URGENT if self.p1 else self.PRIORITY_NORMAL,
|
||||
"tags": [self.TAGS_P1 if self.p1 else self.TAGS_P2],
|
||||
"type": self.ticket_type,
|
||||
"custom_fields": self._get_custom_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
if self.email_ccs:
|
||||
data["ticket"]["email_ccs"] = [
|
||||
{"user_email": email, "action": "put"} for email in self.email_ccs
|
||||
]
|
||||
|
||||
# if no requester provided, then the call came from within Notify 👻
|
||||
if self.user_email:
|
||||
data["ticket"]["requester"] = {
|
||||
"email": self.user_email,
|
||||
"name": self.user_name or "(no name supplied)",
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
def _get_custom_fields(self):
|
||||
technical_ticket_tag = (
|
||||
f'notify_ticket_type_{"" if self.technical_ticket else "non_"}technical'
|
||||
)
|
||||
org_type_tag = f"notify_org_type_{self.org_type}" if self.org_type else None
|
||||
|
||||
return [
|
||||
{
|
||||
"id": "1900000744994",
|
||||
"value": technical_ticket_tag,
|
||||
}, # Notify Ticket type field
|
||||
{
|
||||
"id": "360022836500",
|
||||
"value": self.ticket_categories,
|
||||
}, # Notify Ticket category field
|
||||
{
|
||||
"id": "360022943959",
|
||||
"value": self.org_id,
|
||||
}, # Notify Organisation ID field
|
||||
{
|
||||
"id": "360022943979",
|
||||
"value": org_type_tag,
|
||||
}, # Notify Organisation type field
|
||||
{
|
||||
"id": "1900000745014",
|
||||
"value": self.service_id,
|
||||
}, # Notify Service ID field
|
||||
]
|
||||
@@ -1,71 +0,0 @@
|
||||
import io
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from notifications_utils.clients.antivirus.antivirus_client import (
|
||||
AntivirusClient,
|
||||
AntivirusError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def antivirus(app, mocker):
|
||||
client = AntivirusClient()
|
||||
app.config["ANTIVIRUS_API_HOST"] = "https://antivirus"
|
||||
app.config["ANTIVIRUS_API_KEY"] = "test-antivirus-key"
|
||||
client.init_app(app)
|
||||
return client
|
||||
|
||||
|
||||
def test_scan_document(antivirus, rmock):
|
||||
document = io.BytesIO(b"filecontents")
|
||||
rmock.request(
|
||||
"POST",
|
||||
"https://antivirus/scan",
|
||||
json={"ok": True},
|
||||
request_headers={
|
||||
"Authorization": "Bearer test-antivirus-key",
|
||||
},
|
||||
status_code=200,
|
||||
)
|
||||
|
||||
resp = antivirus.scan(document)
|
||||
|
||||
assert resp
|
||||
assert "filecontents" in rmock.last_request.text
|
||||
assert document.tell() == 0
|
||||
|
||||
|
||||
def test_should_raise_for_status(antivirus, rmock):
|
||||
with pytest.raises(AntivirusError) as excinfo:
|
||||
_test_one_statement_for_status(antivirus, rmock)
|
||||
|
||||
assert excinfo.value.message == "Antivirus error"
|
||||
assert excinfo.value.status_code == 400
|
||||
|
||||
|
||||
def _test_one_statement_for_status(antivirus, rmock):
|
||||
rmock.request(
|
||||
"POST",
|
||||
"https://antivirus/scan",
|
||||
json={"error": "Antivirus error"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
antivirus.scan(io.BytesIO(b"document"))
|
||||
|
||||
|
||||
def test_should_raise_for_connection_errors(antivirus, rmock):
|
||||
with pytest.raises(AntivirusError) as excinfo:
|
||||
_test_one_statement_for_connection_errors(antivirus, rmock)
|
||||
|
||||
assert excinfo.value.message == "connection error"
|
||||
assert excinfo.value.status_code == 503
|
||||
|
||||
|
||||
def _test_one_statement_for_connection_errors(antivirus, rmock):
|
||||
rmock.request(
|
||||
"POST", "https://antivirus/scan", exc=requests.exceptions.ConnectTimeout
|
||||
)
|
||||
antivirus.scan(io.BytesIO(b"document"))
|
||||
@@ -1,88 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from notifications_utils.clients.encryption.encryption_client import (
|
||||
Encryption,
|
||||
EncryptionError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def encryption_client(app):
|
||||
client = Encryption()
|
||||
|
||||
app.config["SECRET_KEY"] = "test-notify-secret-key"
|
||||
app.config["DANGEROUS_SALT"] = "test-notify-salt"
|
||||
|
||||
client.init_app(app)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def test_should_ensure_shared_salt_security(app):
|
||||
client = Encryption()
|
||||
app.config["SECRET_KEY"] = "test-notify-secret-key"
|
||||
app.config["DANGEROUS_SALT"] = "too-short"
|
||||
with pytest.raises(EncryptionError):
|
||||
client.init_app(app)
|
||||
|
||||
|
||||
def test_should_ensure_custom_salt_security(encryption_client):
|
||||
with pytest.raises(EncryptionError):
|
||||
encryption_client.encrypt("this", salt="too-short")
|
||||
|
||||
|
||||
def test_should_encrypt_strings(encryption_client):
|
||||
encrypted = encryption_client.encrypt("this")
|
||||
assert encrypted != "this"
|
||||
assert isinstance(encrypted, str)
|
||||
|
||||
|
||||
def test_should_encrypt_dicts(encryption_client):
|
||||
to_encrypt = {"hello": "world"}
|
||||
encrypted = encryption_client.encrypt(to_encrypt)
|
||||
assert encrypted != to_encrypt
|
||||
assert encryption_client.decrypt(encrypted) == to_encrypt
|
||||
|
||||
|
||||
def test_encryption_is_nondeterministic(encryption_client):
|
||||
first_run = encryption_client.encrypt("this")
|
||||
second_run = encryption_client.encrypt("this")
|
||||
assert first_run != second_run
|
||||
|
||||
|
||||
def test_should_decrypt_content(encryption_client):
|
||||
encrypted = encryption_client.encrypt("this")
|
||||
assert encryption_client.decrypt(encrypted) == "this"
|
||||
|
||||
|
||||
def test_should_decrypt_content_with_custom_salt(encryption_client):
|
||||
salt = "different-salt-value"
|
||||
encrypted = encryption_client.encrypt("this", salt=salt)
|
||||
assert encryption_client.decrypt(encrypted, salt=salt) == "this"
|
||||
|
||||
|
||||
def test_should_verify_decryption(encryption_client):
|
||||
encrypted = encryption_client.encrypt("this")
|
||||
with pytest.raises(EncryptionError):
|
||||
encryption_client.decrypt(encrypted, salt="different-salt-value")
|
||||
|
||||
|
||||
def test_should_sign_and_serialize_string(encryption_client):
|
||||
signed = encryption_client.sign("this")
|
||||
assert signed != "this"
|
||||
|
||||
|
||||
def test_should_verify_signature_and_deserialize_string(encryption_client):
|
||||
signed = encryption_client.sign("this")
|
||||
assert encryption_client.verify_signature(signed) == "this"
|
||||
|
||||
|
||||
def test_should_raise_encryption_error_on_bad_salt(encryption_client):
|
||||
signed = encryption_client.sign("this")
|
||||
with pytest.raises(EncryptionError):
|
||||
encryption_client.verify_signature(signed, salt="different-salt-value")
|
||||
|
||||
|
||||
def test_should_sign_and_serialize_json(encryption_client):
|
||||
signed = encryption_client.sign({"this": "that"})
|
||||
assert encryption_client.verify_signature(signed) == {"this": "that"}
|
||||
@@ -1,227 +0,0 @@
|
||||
from base64 import b64decode
|
||||
|
||||
import pytest
|
||||
|
||||
from notifications_utils.clients.zendesk.zendesk_client import (
|
||||
NotifySupportTicket,
|
||||
ZendeskClient,
|
||||
ZendeskError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def zendesk_client(app):
|
||||
client = ZendeskClient()
|
||||
|
||||
app.config["ZENDESK_API_KEY"] = "testkey"
|
||||
|
||||
client.init_app(app)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def test_zendesk_client_send_ticket_to_zendesk(zendesk_client, app, mocker, rmock):
|
||||
rmock.request(
|
||||
"POST",
|
||||
ZendeskClient.ZENDESK_TICKET_URL,
|
||||
status_code=201,
|
||||
json={
|
||||
"ticket": {
|
||||
"id": 12345,
|
||||
"subject": "Something is wrong",
|
||||
}
|
||||
},
|
||||
)
|
||||
mock_logger = mocker.patch.object(app.logger, "info")
|
||||
|
||||
ticket = NotifySupportTicket("subject", "message", "incident")
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
|
||||
assert rmock.last_request.headers["Authorization"][:6] == "Basic "
|
||||
b64_auth = rmock.last_request.headers["Authorization"][6:]
|
||||
assert (
|
||||
b64decode(b64_auth.encode()).decode()
|
||||
== "zd-api-notify@digital.cabinet-office.gov.uk/token:testkey"
|
||||
)
|
||||
assert rmock.last_request.json() == ticket.request_data
|
||||
mock_logger.assert_called_once_with("Zendesk create ticket 12345 succeeded")
|
||||
|
||||
|
||||
def test_zendesk_client_send_ticket_to_zendesk_error(
|
||||
zendesk_client, app, mocker, rmock
|
||||
):
|
||||
rmock.request(
|
||||
"POST", ZendeskClient.ZENDESK_TICKET_URL, status_code=401, json={"foo": "bar"}
|
||||
)
|
||||
|
||||
mock_logger = mocker.patch.object(app.logger, "error")
|
||||
|
||||
ticket = NotifySupportTicket("subject", "message", "incident")
|
||||
|
||||
with pytest.raises(ZendeskError):
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
|
||||
mock_logger.assert_called_with(
|
||||
"Zendesk create ticket request failed with 401 '{'foo': 'bar'}'"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("p1_arg", "expected_tags", "expected_priority"),
|
||||
[
|
||||
(
|
||||
{},
|
||||
["govuk_notify_support"],
|
||||
"normal",
|
||||
),
|
||||
(
|
||||
{
|
||||
"p1": False,
|
||||
},
|
||||
["govuk_notify_support"],
|
||||
"normal",
|
||||
),
|
||||
(
|
||||
{
|
||||
"p1": True,
|
||||
},
|
||||
["govuk_notify_emergency"],
|
||||
"urgent",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_notify_support_ticket_request_data(p1_arg, expected_tags, expected_priority):
|
||||
notify_ticket_form = NotifySupportTicket("subject", "message", "question", **p1_arg)
|
||||
|
||||
assert notify_ticket_form.request_data == {
|
||||
"ticket": {
|
||||
"subject": "subject",
|
||||
"comment": {
|
||||
"body": "message",
|
||||
"public": True,
|
||||
},
|
||||
"group_id": NotifySupportTicket.NOTIFY_GROUP_ID,
|
||||
"organization_id": NotifySupportTicket.NOTIFY_ORG_ID,
|
||||
"ticket_form_id": NotifySupportTicket.NOTIFY_TICKET_FORM_ID,
|
||||
"priority": expected_priority,
|
||||
"tags": expected_tags,
|
||||
"type": "question",
|
||||
"custom_fields": [
|
||||
{"id": "1900000744994", "value": "notify_ticket_type_non_technical"},
|
||||
{"id": "360022836500", "value": []},
|
||||
{"id": "360022943959", "value": None},
|
||||
{"id": "360022943979", "value": None},
|
||||
{"id": "1900000745014", "value": None},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_notify_support_ticket_request_data_with_message_hidden_from_requester():
|
||||
notify_ticket_form = NotifySupportTicket(
|
||||
"subject", "message", "problem", requester_sees_message_content=False
|
||||
)
|
||||
|
||||
assert notify_ticket_form.request_data["ticket"]["comment"]["public"] is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("name", "zendesk_name"), [("Name", "Name"), (None, "(no name supplied)")]
|
||||
)
|
||||
def test_notify_support_ticket_request_data_with_user_name_and_email(
|
||||
name, zendesk_name
|
||||
):
|
||||
notify_ticket_form = NotifySupportTicket(
|
||||
"subject", "message", "question", user_name=name, user_email="user@example.com"
|
||||
)
|
||||
|
||||
assert (
|
||||
notify_ticket_form.request_data["ticket"]["requester"]["email"]
|
||||
== "user@example.com"
|
||||
)
|
||||
assert (
|
||||
notify_ticket_form.request_data["ticket"]["requester"]["name"] == zendesk_name
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
(
|
||||
"custom_fields",
|
||||
"tech_ticket_tag",
|
||||
"categories",
|
||||
"org_id",
|
||||
"org_type",
|
||||
"service_id",
|
||||
),
|
||||
[
|
||||
(
|
||||
{"technical_ticket": True},
|
||||
"notify_ticket_type_technical",
|
||||
[],
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
(
|
||||
{"technical_ticket": False},
|
||||
"notify_ticket_type_non_technical",
|
||||
[],
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
(
|
||||
{"ticket_categories": ["notify_billing", "notify_bug"]},
|
||||
"notify_ticket_type_non_technical",
|
||||
["notify_billing", "notify_bug"],
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
(
|
||||
{"org_id": "1234", "org_type": "local"},
|
||||
"notify_ticket_type_non_technical",
|
||||
[],
|
||||
"1234",
|
||||
"notify_org_type_local",
|
||||
None,
|
||||
),
|
||||
(
|
||||
{"service_id": "abcd", "org_type": "nhs"},
|
||||
"notify_ticket_type_non_technical",
|
||||
[],
|
||||
None,
|
||||
"notify_org_type_nhs",
|
||||
"abcd",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_notify_support_ticket_request_data_custom_fields(
|
||||
custom_fields,
|
||||
tech_ticket_tag,
|
||||
categories,
|
||||
org_id,
|
||||
org_type,
|
||||
service_id,
|
||||
):
|
||||
notify_ticket_form = NotifySupportTicket(
|
||||
"subject", "message", "question", **custom_fields
|
||||
)
|
||||
|
||||
assert notify_ticket_form.request_data["ticket"]["custom_fields"] == [
|
||||
{"id": "1900000744994", "value": tech_ticket_tag},
|
||||
{"id": "360022836500", "value": categories},
|
||||
{"id": "360022943959", "value": org_id},
|
||||
{"id": "360022943979", "value": org_type},
|
||||
{"id": "1900000745014", "value": service_id},
|
||||
]
|
||||
|
||||
|
||||
def test_notify_support_ticket_request_data_email_ccs():
|
||||
notify_ticket_form = NotifySupportTicket(
|
||||
"subject", "message", "question", email_ccs=["someone@example.com"]
|
||||
)
|
||||
|
||||
assert notify_ticket_form.request_data["ticket"]["email_ccs"] == [
|
||||
{"user_email": "someone@example.com", "action": "put"},
|
||||
]
|
||||
Reference in New Issue
Block a user