mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-05-04 16:11:11 -04:00
@@ -41,7 +41,7 @@ def register():
|
|||||||
def register_from_org_invite():
|
def register_from_org_invite():
|
||||||
invited_org_user = InvitedOrgUser.from_session()
|
invited_org_user = InvitedOrgUser.from_session()
|
||||||
if not invited_org_user:
|
if not invited_org_user:
|
||||||
abort(404)
|
abort(404, "No invited_org_user")
|
||||||
|
|
||||||
form = RegisterUserFromOrgInviteForm(
|
form = RegisterUserFromOrgInviteForm(
|
||||||
invited_org_user,
|
invited_org_user,
|
||||||
@@ -53,7 +53,7 @@ def register_from_org_invite():
|
|||||||
form.organization.data != invited_org_user.organization
|
form.organization.data != invited_org_user.organization
|
||||||
or form.email_address.data != invited_org_user.email_address
|
or form.email_address.data != invited_org_user.email_address
|
||||||
):
|
):
|
||||||
abort(400)
|
abort(400, "organization or email address doesn't match")
|
||||||
_do_registration(
|
_do_registration(
|
||||||
form,
|
form,
|
||||||
send_email=False,
|
send_email=False,
|
||||||
@@ -104,9 +104,15 @@ def registration_continue():
|
|||||||
|
|
||||||
def get_invite_data_from_redis(state):
|
def get_invite_data_from_redis(state):
|
||||||
|
|
||||||
invite_data = json.loads(redis_client.get(f"invitedata-{state}"))
|
invite_data = json.loads(redis_client.get(f"invitedata-{state}").decode("utf8"))
|
||||||
user_email = redis_client.get(f"user_email-{state}").decode("utf8")
|
user_email = redis_client.get(f"user_email-{state}").decode("utf8")
|
||||||
user_uuid = redis_client.get(f"user_uuid-{state}").decode("utf8")
|
user_uuid = redis_client.get(f"user_uuid-{state}").decode("utf8")
|
||||||
|
|
||||||
|
# login.gov is going to fail if we don't have at least one of these
|
||||||
|
if user_email is None and user_uuid is None:
|
||||||
|
flash("Can't find user email and/or uuid")
|
||||||
|
abort(403, "Can't find user email and/or uuid")
|
||||||
|
|
||||||
invited_user_email_address = redis_client.get(
|
invited_user_email_address = redis_client.get(
|
||||||
f"invited_user_email_address-{state}"
|
f"invited_user_email_address-{state}"
|
||||||
).decode("utf8")
|
).decode("utf8")
|
||||||
@@ -116,7 +122,7 @@ def get_invite_data_from_redis(state):
|
|||||||
def put_invite_data_in_redis(
|
def put_invite_data_in_redis(
|
||||||
state, invite_data, user_email, user_uuid, invited_user_email_address
|
state, invite_data, user_email, user_uuid, invited_user_email_address
|
||||||
):
|
):
|
||||||
ttl = 60 * 15 # 15 minutes
|
ttl = 2 * 24 * 60 * 60
|
||||||
|
|
||||||
redis_client.set(f"invitedata-{state}", json.dumps(invite_data), ex=ttl)
|
redis_client.set(f"invitedata-{state}", json.dumps(invite_data), ex=ttl)
|
||||||
redis_client.set(f"user_email-{state}", user_email, ex=ttl)
|
redis_client.set(f"user_email-{state}", user_email, ex=ttl)
|
||||||
@@ -132,14 +138,12 @@ def check_invited_user_email_address_matches_expected(
|
|||||||
user_email, invited_user_email_address
|
user_email, invited_user_email_address
|
||||||
):
|
):
|
||||||
if user_email.lower() != invited_user_email_address.lower():
|
if user_email.lower() != invited_user_email_address.lower():
|
||||||
debug_msg("invited user email did not match expected email, abort(403)")
|
|
||||||
flash("You cannot accept an invite for another person.")
|
flash("You cannot accept an invite for another person.")
|
||||||
abort(403)
|
abort(403, "You cannot accept an invite for another person #invite")
|
||||||
|
|
||||||
if not is_gov_user(user_email):
|
if not is_gov_user(user_email):
|
||||||
debug_msg("invited user has a non-government email address.")
|
|
||||||
flash("You must use a government email address.")
|
flash("You must use a government email address.")
|
||||||
abort(403)
|
abort(403, "You must use a government email address #invites")
|
||||||
|
|
||||||
|
|
||||||
@main.route("/set-up-your-profile", methods=["GET", "POST"])
|
@main.route("/set-up-your-profile", methods=["GET", "POST"])
|
||||||
@@ -153,10 +157,13 @@ def set_up_your_profile():
|
|||||||
state_key = f"login-state-{unquote(state)}"
|
state_key = f"login-state-{unquote(state)}"
|
||||||
stored_state = unquote(redis_client.get(state_key).decode("utf8"))
|
stored_state = unquote(redis_client.get(state_key).decode("utf8"))
|
||||||
if state != stored_state:
|
if state != stored_state:
|
||||||
current_app.logger.error(f"#invites State Error: {state} != {stored_state}")
|
flash("Internal error: cannot recognize stored state")
|
||||||
abort(403)
|
abort(403, "Internal error: cannot recognize stored state #invites")
|
||||||
|
|
||||||
login_gov_error = request.args.get("error")
|
login_gov_error = request.args.get("error")
|
||||||
|
if login_gov_error:
|
||||||
|
flash(f"Login.gov error: {login_gov_error}")
|
||||||
|
abort(403, f"login_gov_error {login_gov_error} #invites")
|
||||||
|
|
||||||
user_email = redis_client.get(f"user_email-{state}")
|
user_email = redis_client.get(f"user_email-{state}")
|
||||||
user_uuid = redis_client.get(f"user_uuid-{state}")
|
user_uuid = redis_client.get(f"user_uuid-{state}")
|
||||||
@@ -186,7 +193,8 @@ def set_up_your_profile():
|
|||||||
|
|
||||||
invited_user_accept_invite(invited_user_id)
|
invited_user_accept_invite(invited_user_id)
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f"accepted invite user with invited_user_id {invited_user_id} to service {invite_data['service_id']}"
|
f"#invites: accepted invite user with invited_user_id \
|
||||||
|
{invited_user_id} to service {invite_data['service_id']}"
|
||||||
)
|
)
|
||||||
# We need to avoid taking a second trip through the login.gov code because we cannot pull the
|
# We need to avoid taking a second trip through the login.gov code because we cannot pull the
|
||||||
# access token twice. So once we retrieve these values, let's park them in redis for 15 minutes
|
# access token twice. So once we retrieve these values, let's park them in redis for 15 minutes
|
||||||
@@ -244,10 +252,6 @@ def set_up_your_profile():
|
|||||||
current_app.logger.info(f"#invites redirecting to {url}")
|
current_app.logger.info(f"#invites redirecting to {url}")
|
||||||
return redirect(url)
|
return redirect(url)
|
||||||
|
|
||||||
elif login_gov_error:
|
|
||||||
current_app.logger.error(f"#invites: login.gov error: {login_gov_error}")
|
|
||||||
abort(403)
|
|
||||||
|
|
||||||
# we take two trips through this method, but should only hit this
|
# we take two trips through this method, but should only hit this
|
||||||
# line on the first trip. On the second trip, we should get redirected
|
# line on the first trip. On the second trip, we should get redirected
|
||||||
# to the accounts page because we have successfully registered.
|
# to the accounts page because we have successfully registered.
|
||||||
@@ -269,14 +273,14 @@ def invited_user_accept_invite(invited_user_id):
|
|||||||
flash(
|
flash(
|
||||||
"Your invitation has expired; please contact the person who invited you for additional help."
|
"Your invitation has expired; please contact the person who invited you for additional help."
|
||||||
)
|
)
|
||||||
abort(401)
|
abort(401, "Your invitation has expired #invites")
|
||||||
|
|
||||||
if invited_user.status == InvitedUserStatus.CANCELLED:
|
if invited_user.status == InvitedUserStatus.CANCELLED:
|
||||||
current_app.logger.error("User invitation has been cancelled")
|
current_app.logger.error("User invitation has been cancelled")
|
||||||
flash(
|
flash(
|
||||||
"Your invitation is no longer valid; please contact the person who invited you for additional help."
|
"Your invitation is no longer valid; please contact the person who invited you for additional help."
|
||||||
)
|
)
|
||||||
abort(401)
|
abort(401, "Your invitation was canceled #invites")
|
||||||
|
|
||||||
invited_user.accept_invite()
|
invited_user.accept_invite()
|
||||||
|
|
||||||
|
|||||||
@@ -394,7 +394,10 @@ def test_check_invited_user_email_address_doesnt_match_expected(mocker):
|
|||||||
mock_flash.assert_called_once_with(
|
mock_flash.assert_called_once_with(
|
||||||
"You cannot accept an invite for another person."
|
"You cannot accept an invite for another person."
|
||||||
)
|
)
|
||||||
mock_abort.assert_called_once_with(403)
|
mock_abort.assert_called_once_with(
|
||||||
|
403,
|
||||||
|
"You cannot accept an invite for another person #invite",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_check_user_email_address_fails_if_not_government_address(mocker):
|
def test_check_user_email_address_fails_if_not_government_address(mocker):
|
||||||
@@ -405,7 +408,10 @@ def test_check_user_email_address_fails_if_not_government_address(mocker):
|
|||||||
"fake@fake.bogus", "Fake@Fake.BOGUS"
|
"fake@fake.bogus", "Fake@Fake.BOGUS"
|
||||||
)
|
)
|
||||||
mock_flash.assert_called_once_with("You must use a government email address.")
|
mock_flash.assert_called_once_with("You must use a government email address.")
|
||||||
mock_abort.assert_called_once_with(403)
|
mock_abort.assert_called_once_with(
|
||||||
|
403,
|
||||||
|
"You must use a government email address #invites",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_check_user_email_address_succeeds_if_government_address(mocker):
|
def test_check_user_email_address_succeeds_if_government_address(mocker):
|
||||||
|
|||||||
Reference in New Issue
Block a user