Files
notifications-api/app/dao/users_dao.py

205 lines
5.7 KiB
Python
Raw Normal View History

import uuid
2021-03-10 13:55:06 +00:00
from datetime import datetime, timedelta
from secrets import randbelow
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from app import db
from app.dao.dao_utils import autocommit
from app.dao.permissions_dao import permission_dao
from app.dao.service_user_dao import dao_get_service_users_by_user_id
from app.enums import AuthType
from app.errors import InvalidRequest
from app.models import User, VerifyCode
from app.utils import escape_special_characters, get_archived_db_column_value
2017-06-13 18:11:13 +01:00
2016-01-21 17:29:24 +00:00
def _remove_values_for_keys_if_present(dict, keys):
for key in keys:
dict.pop(key, None)
def create_secret_code(length=6):
2023-08-29 14:54:30 -07:00
random_number = randbelow(10**length)
return "{:0{length}d}".format(random_number, length=length)
def save_user_attribute(usr, update_dict=None):
db.session.query(User).filter_by(id=usr.id).update(update_dict or {})
db.session.commit()
2023-08-29 14:54:30 -07:00
def save_model_user(
user, update_dict=None, password=None, validated_email_access=False
):
if password:
user.password = password
user.password_changed_at = datetime.utcnow()
if validated_email_access:
user.email_access_validated_at = datetime.utcnow()
if update_dict:
2023-08-29 14:54:30 -07:00
_remove_values_for_keys_if_present(update_dict, ["id", "password_changed_at"])
db.session.query(User).filter_by(id=user.id).update(update_dict or {})
else:
db.session.add(user)
db.session.commit()
2016-01-21 17:29:24 +00:00
def create_user_code(user, code, code_type):
2023-08-29 14:54:30 -07:00
verify_code = VerifyCode(
code_type=code_type,
expiry_datetime=datetime.utcnow() + timedelta(minutes=30),
user=user,
)
2016-01-21 17:29:24 +00:00
verify_code.code = code
db.session.add(verify_code)
db.session.commit()
return verify_code
def get_user_code(user, code, code_type):
# Get the most recent codes to try and reduce the
# time searching for the correct code.
2023-08-29 14:54:30 -07:00
codes = VerifyCode.query.filter_by(user=user, code_type=code_type).order_by(
VerifyCode.created_at.desc()
)
return next((x for x in codes if x.check_code(code)), None)
2016-01-21 17:29:24 +00:00
def delete_codes_older_created_more_than_a_day_ago():
2023-08-29 14:54:30 -07:00
deleted = (
db.session.query(VerifyCode)
.filter(VerifyCode.created_at < datetime.utcnow() - timedelta(hours=24))
.delete()
)
db.session.commit()
return deleted
2016-01-21 17:29:24 +00:00
def use_user_code(id):
verify_code = VerifyCode.query.get(id)
verify_code.code_used = True
db.session.add(verify_code)
db.session.commit()
2016-01-12 10:39:49 +00:00
def delete_model_user(user):
db.session.delete(user)
db.session.commit()
def delete_user_verify_codes(user):
VerifyCode.query.filter_by(user=user).delete()
db.session.commit()
def count_user_verify_codes(user):
2017-02-16 12:44:40 +00:00
query = VerifyCode.query.filter(
VerifyCode.user == user,
VerifyCode.expiry_datetime > datetime.utcnow(),
2023-08-29 14:54:30 -07:00
VerifyCode.code_used.is_(False),
2017-02-16 12:44:40 +00:00
)
return query.count()
def get_user_by_id(user_id=None):
if user_id:
return User.query.filter_by(id=user_id).one()
return User.query.filter_by().all()
def get_users():
return User.query.all()
def get_user_by_email(email):
return User.query.filter(func.lower(User.email_address) == func.lower(email)).one()
def get_users_by_partial_email(email):
email = escape_special_characters(email)
return User.query.filter(User.email_address.ilike("%{}%".format(email))).all()
def increment_failed_login_count(user):
user.failed_login_count += 1
db.session.add(user)
db.session.commit()
def reset_failed_login_count(user):
if user.failed_login_count > 0:
user.failed_login_count = 0
db.session.add(user)
db.session.commit()
def update_user_password(user, password):
# reset failed login count - they've just reset their password so should be fine
user.password = password
user.password_changed_at = datetime.utcnow()
db.session.add(user)
db.session.commit()
def get_user_and_accounts(user_id):
2023-08-29 14:54:30 -07:00
return (
User.query.filter(User.id == user_id)
.options(
# eagerly load the user's services and organizations, and also the service's org and vice versa
# (so we can see if the user knows about it)
joinedload("services"),
joinedload("organizations"),
joinedload("organizations.services"),
joinedload("services.organization"),
)
.one()
)
@autocommit
def dao_archive_user(user):
if not user_can_be_archived(user):
msg = "User cant be removed from a service - check all services have another team member with manage_settings"
raise InvalidRequest(msg, 400)
permission_dao.remove_user_service_permissions_for_all_services(user)
service_users = dao_get_service_users_by_user_id(user.id)
for service_user in service_users:
db.session.delete(service_user)
2023-07-10 11:06:29 -07:00
user.organizations = []
user.auth_type = AuthType.EMAIL
user.email_address = get_archived_db_column_value(user.email_address)
user.mobile_number = None
user.password = str(uuid.uuid4())
# Changing the current_session_id signs the user out
2023-08-29 14:54:30 -07:00
user.current_session_id = "00000000-0000-0000-0000-000000000000"
user.state = "inactive"
db.session.add(user)
def user_can_be_archived(user):
active_services = [x for x in user.services if x.active]
for service in active_services:
2023-08-29 14:54:30 -07:00
other_active_users = [
x for x in service.users if x.state == "active" and x != user
]
if not other_active_users:
return False
2023-08-29 14:54:30 -07:00
if not any(
"manage_settings" in user.get_permissions(service.id)
for user in other_active_users
):
# no-one else has manage settings
return False
return True