diff --git a/requirements.txt b/requirements.txt index 262f8162f..672bdd706 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,8 @@ statsd==3.2.1 jsonschema==2.5.1 Flask-Redis==0.1.0 gunicorn==19.6.0 +docopt==0.6.2 + # pin to minor version 3.1.x notifications-python-client>=3.1,<3.2 diff --git a/scripts/run_celery.sh b/scripts/run_celery.sh index 8b2a1dec2..8830d2491 100755 --- a/scripts/run_celery.sh +++ b/scripts/run_celery.sh @@ -3,4 +3,4 @@ set -e source environment.sh -celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4 +celery -A run_celery.notify_celery worker --pidfile="/tmp/celery.pid" --loglevel=INFO --concurrency=4 diff --git a/scripts/stop_celery.py b/scripts/stop_celery.py new file mode 100755 index 000000000..71b10ec03 --- /dev/null +++ b/scripts/stop_celery.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +""" + +Scipt used to stop celery in AWS environments. +This is used from upstart to issue a TERM signal to the master celery process. + +This will then allow the worker threads to stop, after completing +whatever tasks that are in flight. + +Note the script blocks for up to 15minutes, which is long enough to allow our +longest possible task to complete. If it can return quicker it will. + +Usage: + ./stop_celery.py + +Example: + ./stop_celery.py /tmp/celery.pid +""" + +import os +from docopt import docopt +import re +import subprocess +from time import sleep + + +def strip_white_space(from_this): + return re.sub(r'\s+', '', from_this) + + +def get_pid_from_file(filename): + """ + Open the file which MUST contain only the PID of the master celery process. + This is written to disk by the start celery command issued by upstart + """ + with open(filename) as f: + celery_pid = f.read() + return strip_white_space(celery_pid) + + +def issue_term_signal_to_pid(pid, celery_pid_file): + """ + Issues a TERM signal (15) to the master celery process. + + This method attempts to print out any response from this subprocess call. However this call is generally silent. + """ + print("Trying to stop ", celery_pid_file) + result = subprocess.Popen(['kill', '-15', pid], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + for line in result.stdout.readlines(): + print(line.rstrip()) + for line in result.stderr.readlines(): + print(line.rstrip()) + + +def pid_still_running(pid): + """ + uses the proc filesystem to identify if the celery master pid is still around. + + Once the process stops this file no longer exists. Slim possibilty of a race condition here. + """ + return os.path.exists("/proc/" + pid) + + +if __name__ == "__main__": + arguments = docopt(__doc__) + celery_pid_file = arguments[''] + + celery_pid = get_pid_from_file(celery_pid_file) + + issue_term_signal_to_pid(celery_pid, celery_pid_file) + + """ + Blocking loop to check for the still running process. + 5 seconds between loops + 180 loops + Maximum block time of 900 seconds (15 minutes) + """ + iteration = 0 + while pid_still_running(celery_pid) and iteration < 180: + print("[", celery_pid_file, "] waited for ", iteration * 5, " secs") + sleep(5) + iteration += 1