Posts Tagged ‘programming’

Create a Long-Running-Process Server for Django

Friday, March 6th, 2009

Have you ever wanted to put up a server made exclusively to handle the expensive actions in your web app? In this article I will be showing you how to create a server to that will be set up to receive commands from your web server and process them in the context of your django web app. This technique could be used many places such as a desktop GUI application, or a web service architecture.

Say you have work unit WU which takes pickleable arguments (a, b, c). If your work unit is thread-safe (it modifies no global variables, and causes no side effects in your application) it can be preformed on in another process, or even on another machine, allowing you to return control to the user immediately.

For example, I want to add the ability for a user to import their contacts using X contacts service:

Fig 1: project/views.py

1
2
3
4
def import_contacts(request):
    from my.specialsauce.contacts import run_contacts_import
    contacts = run_contacts_import(request)
    return render_to_response('t/contacts-import.html', {'contacts': contacts})

Currently, the view retrieves and imports the users contacts in the view, making the user wait until the operation has completed to see the returned page. What if we could implement this in a way which would return control to the user immediately, firing off run_contacts_import in another thread, another process, or even another server!

To do this we will need Django, multiprocessing and Python 2.5+. Now, I will assume you are already as far as Fig 1. Say a user with 10,000 contacts (or even 100) ambles along and imports their contacts. Both users are going to be waiting much longer than the average gmail-generation-web-user can be expected to wait. They will hit refresh, restarting this process and crashing your server.

Since the Django request object is pickleable, we can pass it as an argument to another process using multiprocessing.

Fig 2: project/views.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
 
m = Pool(10)
 
def import_contacts(request):
    # run_contacts_import will have to update request.session['CONTACT_IMPORT_STATUS']
    has_status = 'CONTACT_IMPORT_STATUS' in request.session
    status = has_status and request.session['CONTACT_IMPORT_STATUS'] == True
    if status is True:
        return HttpResponseRedirect('/import-finished/')
    elif not status and has_status:
        return render_to_response('t/contacts-import-not-finished.html', {})
    elif not has_status:
        from my.specialsauce.contacts import run_contacts_import
        m.apply_async(run_contacts_import, args=(request,))
        return render_to_response('t/contacts-import.html', {'contacts': contacts})

This seems simple enough, and upon first inspection, seems to work rather decently. However, this adds a global variable to our Django process, which is a bad idea. Using this technique, it would be best to create a lock, spawn off a new thread, and acquire/release the lock while running apply_async (blegh). Unfortunately, this can eventually make your Django server unresponsive as many users come to run the expensive process (simply via system load). We need to move this process, and possibly a few others off our new server.

To do this, the Python multiprocessing package provides the high-level Manager class. The Manager class is used to pipe commands between managers in different instances of the python interpreter.

Create the Manager Class

First, we need to create the Manager class that will eventually be implemented as our server. Create a file called managers.py in project/managers.py.

Fig 3: project/managers.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing.managers import BaseManager
 
from my.specialsauce.contacts import run_contacts_import
 
# This is the shared object. It simply implements a function which applies to the instance variable
# pool, which will be installed when the server is actually started (in a snippet below)
class ContactImporter(object):
    def __init__(self, pool=None):
        self.pool = pool
 
    def run_import(self, *args):
        self.pool.apply_async(run_contacts_import, args=args)
 
# instantiate the shared object
importer = ContactImporter()
 
# our manager
class CManager(BaseManager):
    pass
 
# you need to register a function which will return the shared object
# the shared object could be anything pickleable in python, in this case
# our new-style class descending from object
CManager.register('get_importer', callable=lambda:importer)

Create the Server

We are going to be making a daemon server that we can start and stop using your projects manage.py file. To do this create a file called lrpserver.py in project/management/commands/lrpserver.py.

Fig 4: project/management/commands/lrpserver.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import sys
import time
from optparse import make_option
from signal import SIGTERM
 
from multiprocessing import Pool
 
from django.core.management.base import BaseCommand
from django.conf import settings
 
from project.managers import CManager, importer
 
 
class Command(BaseCommand):
    option_list = BaseCommand.option_list + (
        make_option('--pidfile', dest='pidfile', default='', help="Specifies the PID file to use."),
        make_option('--start', dest='start', default='no', help='Starts the daemon'),
        make_option('--stop', dest='stop', default='no', help='Stops the daemon'),
        make_option('--restart', dest='restart', default='no', help='Restarts the daemon')
    )
    def handle(self, *args, **options):
        pidfile = options.get('pidfile')
        if not pidfile:
            print "--pidfile arg required"
            sys.exit(1)
        start = options.get('start')
        stop = options.get('stop')
        restart = options.get('restart')
        server = getattr(settings, 'LRP_SERVER_HOST', None)
        port = getattr(settings, 'LRP_SERVER_PORT', None)
        authkey = getattr(settings, 'LRP_SERVER_AUTHKEY', None)
 
        if server is None or port is None or authkey is None:
            print 'LRP_SERVER_HOST, LRP_SERVER_PORT and LRP_SERVER_AUTHKEY values required in settings file.'
            sys.exit(1)
 
        # This daemonizes our process and starts the server
        def _start():
            print 'Starting Long Running Process Server'
            from django.utils.daemonize import become_daemon
            become_daemon()
            fp = open(pidfile, "w")
            fp.write('%d\n' % os.getpid())
            fp.close()
 
            # set the pool that importer.run_import requires
            importer.pool = Pool(processes=10)
            # create an instance of CManager, get a server instance and start the server loop
            m = CManager(address=(server, port), authkey=authkey)
            s = m.get_server()
            s.serve_forever()
 
        def _stop():
            try:
                fp = open(pidfile, 'r')
                pid = int(fp.read().strip())
                fp.close()
            except IOError:
                pid = None
            if not pid:
                print 'Long Running Process Server Not Currently Running'
                return
            try:
                print 'Stopping Long Running Process Server'
                while 1:
                    os.kill(pid, SIGTERM)
                    time.sleep(0.1)
            except OSError, err:
                err = str(err)
                if err.lower().find('no such process') > 0:
                    if os.path.exists(pidfile):
                        os.remove(pidfile)
                else:
                    print err
                    sys.exit(1)
 
        def _restart():
            _stop()
            _start()
 
        if str(start) == 'yes':
            return _start()
        elif str(stop) == 'yes':
            return _stop()
        elif str(restart) == 'yes':
            return _restart()
        else:
            print 'Options: pidfile=%s start=%s stop=%s restart=%s' % (pidfile, start, stop, restart)
            print 'One of --(start|stop|restart)=yes is required.'

Add the Required Settings to Your settings.py File

The server and client require three settings, the host name of your server, the port on which the server is listening and the auth key of the required to access the server. In your settings.py file add the following with your values. For now, use the blank string ” as the setting for host name. This instructs the server to serve from localhost. (Hint: If you do not know this value use socket.getfqdn()).

Fig 5: settings.py

1
2
3
LRP_SERVER_HOST = ''
LRP_SERVER_PORT = 5667
LRP_SERVER_AUTHKEY = 'password'

Modify Your View Code

To connect to the server, you must spawn a thread which creates an instance of the manager, connects and sends the command.

Fig 6: project/views.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from threading import Thread
from multiprocessing.managers import BaseManager
 
from django.conf import settings
 
 
class CManager(BaseManager):
    pass
 
 
CManager.register('get_importer')
 
# This function will become a thread which instantiates the Manager class,
# connects and starts the long running process
def _start_import_thread(request):
    gci_manager = CManager(address=(settings.GCI_SERVER_HOST, settings.GCI_SERVER_PORT),
                           authkey=settings.GCI_SERVER_AUTHKEY)
    gci_manager.connect()
    importer = gci_manager.get_importer()
    importer.run_import(request)
 
# This is not a view, it's a helper function which starts
# the thread above
def start_contacts_import(request):
    t = Thread(target=_start_import_thread, args=[request])
    t.setDaemon(True)
    t.start()
 
def import_contacts(request):
    # run_contacts_import will have to update request.session['CONTACT_IMPORT_STATUS']
    has_status = 'CONTACT_IMPORT_STATUS' in request.session
    status = has_status and request.session['CONTACT_IMPORT_STATUS'] == True
    if status is True:
        return HttpResponseRedirect('/import-finished/')
    elif not status and has_status:
        return render_to_response('t/contacts-import-not-finished.html', {})
    elif not has_status:
        start_contacts_import(request)
        return render_to_response('t/contacts-import.html', {'contacts': contacts})

Start the Server and Let it Fly!

And finally we may start the server.

1
python manage.py lrpserver.py --pidfile=/home/django/run/lrp.pid --start=yes

Now you can run your view.

Follow-Up

This process is applicable to more than just crawlers and contact importers. It can be used to minimize the load on your server for any number of tasks such as image and other file processing or running large database updates. The server can be put on another machine simply by duplicating your django installation on another machine and using that machine’s socket.getfqdn() (or ip address) value as the host name in your configs. You can also add a javascript status updater to notify the user of progress information related to their request.