SaltyCrane Blog — Notes on PythonJavascript and web development

A bank style session timeout example using jQuery, Bootstrap, and Flask

This is an example that uses Javascript to display a session timeout warning modal 10 minutes before session expiration. It also resets the session expiration whenever the user clicks the mouse. It uses Javascript, jQuery, and Bootstrap on the frontend and Python, Flask, Flask-Login, and WTForms on the backend.

  • Mouse clicks anywhere on the page ping the server at a maximum frequecy of once per minute and reset the session expiration.
  • 10 minutes before the session expiration, a warning modal is displayed with two buttons: "Log out" and "Stay Logged In".
  • If the user clicks "Stay Logged In" the session expiration is reset.
  • If the user clicks "Log out", the user is logged out.
  • If the user does nothing for 10 minutes, the user is logged out and displayed a message that the session timed out.

Here is the Javascript (session-monitor.js):

sessionMonitor = function(options) {
    "use strict";

    var defaults = {
            // Session lifetime (milliseconds)
            sessionLifetime: 60 * 60 * 1000,
            // Amount of time before session expiration when the warning is shown (milliseconds)
            timeBeforeWarning: 10 * 60 * 1000,
            // Minimum time between pings to the server (milliseconds)
            minPingInterval: 1 * 60 * 1000,
            // Space-separated list of events passed to $(document).on() that indicate a user is active
            activityEvents: 'mouseup',
            // URL to ping the server using HTTP POST to extend the session
            pingUrl: '/ping',
            // URL used to log out when the user clicks a "Log out" button
            logoutUrl: '/logout',
            // URL used to log out when the session times out
            timeoutUrl: '/logout?timeout=1',
            ping: function() {
                // Ping the server to extend the session expiration using a POST request.
                $.ajax({
                    type: 'POST',
                    url: self.pingUrl
                });
            },
            logout: function() {
                // Go to the logout page.
                window.location.href = self.logoutUrl;
            },
            onwarning: function() {
                // Below is example code to demonstrate basic functionality. Use this to warn
                // the user that the session will expire and allow the user to take action.
                // Override this method to customize the warning.
                var warningMinutes = Math.round(self.timeBeforeWarning / 60 / 1000),
                    $alert = $('<div id="jqsm-warning">Your session will expire in ' + warningMinutes + ' minutes. ' +
                               '<button id="jqsm-stay-logged-in">Stay Logged In</button>' +
                               '<button id="jqsm-log-out">Log Out</button>' +
                               '</div>');

                if (!$('body').children('div#jqsm-warning').length) {
                    $('body').prepend($alert);
                }
                $('div#jqsm-warning').show();
                $('button#jqsm-stay-logged-in').on('click', self.extendsess)
                    .on('click', function() { $alert.hide(); });
                $('button#jqsm-log-out').on('click', self.logout);
            },
            onbeforetimeout: function() {
                // By default this does nothing. Override this method to perform actions
                // (such as saving draft data) before the user is automatically logged out.
                // This may optionally return a jQuery Deferred object, in which case
                // ontimeout will be executed when the deferred is resolved or rejected.
            },
            ontimeout: function() {
                // Go to the timeout page.
                window.location.href = self.timeoutUrl;
            }
        },
        self = {},
        _warningTimeoutID,
        _expirationTimeoutID,
        // The time of the last ping to the server.
        _lastPingTime = 0;

    function extendsess() {
        // Extend the session expiration. Ping the server and reset the timers if
        // the minimum interval has passed since the last ping.
        var now = $.now(),
            timeSinceLastPing = now - _lastPingTime;

        if (timeSinceLastPing > self.minPingInterval) {
            _lastPingTime = now;
            _resetTimers();
            self.ping();
        }
    }

    function _resetTimers() {
        // Reset the session warning and session expiration timers.
        var warningTimeout = self.sessionLifetime - self.timeBeforeWarning;

        window.clearTimeout(_warningTimeoutID);
        window.clearTimeout(_expirationTimeoutID);
        _warningTimeoutID = window.setTimeout(self.onwarning, warningTimeout);
        _expirationTimeoutID = window.setTimeout(_onTimeout, self.sessionLifetime);
    }

    function _onTimeout() {
        // A wrapper that calls onbeforetimeout and ontimeout and supports asynchronous code.
        $.when(self.onbeforetimeout()).always(self.ontimeout);
    }

    // Add default variables and methods, user specified options, and non-overridable
    // public methods to the session monitor instance.
    $.extend(self, defaults, options, {
        extendsess: extendsess
    });
    // Set an event handler to extend the session upon user activity (e.g. mouseup).
    $(document).on(self.activityEvents, extendsess);
    // Start the timers and ping the server to ensure they are in sync with the backend session expiration.
    extendsess();

    return self;
};

Here is the important HTML:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="utf-8" />
        <title>Login</title>
        <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.2.0/css/bootstrap.min.css" />
    </head>
    <body>
        <div id="session-warning-modal" class="modal">
            <div class="modal-dialog">
                <div class="modal-content">
                    <div class="modal-header">
                        <h4 class="modal-title" id="sessWarnLabel">Your session is about to expire<a class="sectionlink" title="Section permalink" href="#sessWarnLabel"></a></h4>
                    </div>
                    <div class="modal-body">
                        Your session will expire in <span id="remaining-time"></span> minutes due to inactivity.
                    </div>
                    <div class="modal-footer">
                        <button id="log-out" class="btn btn-default" type="button" data-dismiss="modal">Log Out</button>
                        <button id="stay-logged-in" class="btn btn-warning" type="button" data-dismiss="modal">Stay Logged In</button>
                    </div>
                </div>
            </div>
        </div>
        <script src="//ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
        <script src="//maxcdn.bootstrapcdn.com/bootstrap/3.2.0/js/bootstrap.min.js"></script>
        <script src="{{ url_for('static', filename='session-monitor.js')}}"></script>
        <script type="text/javascript">
            // Configure and start the session timeout monitor
            sessMon = sessionMonitor({
                // Subtract 1 minute to ensure the backend doesn't expire the session first
                sessionLifetime: {{ PERMANENT_SESSION_LIFETIME_MS }} - (1 * 60 * 1000),
                timeBeforeWarning: 10 * 60 * 1000,  // 10 minutes
                minPingInterval: 1 * 60 * 1000,  // 1 minute
                pingUrl: '/ping',
                logoutUrl: '/logout',
                timeoutUrl: '/logged-out?timeout=1&next=' + encodeURIComponent(
                    window.location.pathname + window.location.search + window.location.hash),
                // The "mouseup" event was used instead of "click" because some of the
                // inner elements on some pages have click event handlers that stop propagation.
                activityEvents: 'mouseup',
                onwarning: function() {
                    $("#session-warning-modal").modal("show");
                }
            });
            $(document).ready( function() {
                // Configure the session timeout warning modal
                $("#session-warning-modal")
                    .modal({
                        "backdrop": "static",
                        "keyboard": false,
                        "show": false
                    })
                    .on("click", "#stay-logged-in", sessMon.extendsess)
                    .on("click", "#log-out", sessMon.logout)
                    .find("#remaining-time").text(Math.round(sessMon.timeBeforeWarning / 60 / 1000));
            });
            window.sessMon = sessMon;
        </script>
    </body>
</html>

Here is the Python Flask app (myapp.py):

import collections
import datetime

from flask import Flask, request, render_template, redirect, url_for, session
from flask.ext.login import (
    LoginManager, login_user, logout_user,  UserMixin, login_required)
from wtforms.fields import PasswordField, StringField
from wtforms.form import Form


UserRow = collections.namedtuple('UserRow', ['id', 'password'])
TOY_USER_DATABASE = {
    'george': UserRow(id=1, password='george'),
}


# settings ###############################################################
# Set a secret key to sign the session (Flask config value)
SECRET_KEY = 'insert secret key here'

# The amount of time after which the user's session expires
# (this is a Flask setting and is also used by the Javascript)
PERMANENT_SESSION_LIFETIME = datetime.timedelta(minutes=60)


# init ###############################################################
app = Flask(__name__)
app.config.from_object(__name__)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = '.login'


@login_manager.user_loader
def load_user(userid):
    return User(userid)


@app.context_processor
def add_session_config():
    """Add current_app.permanent_session_lifetime converted to milliseconds
    to context. The config variable PERMANENT_SESSION_LIFETIME is not
    used because it could be either a timedelta object or an integer
    representing seconds.
    """
    return {
        'PERMANENT_SESSION_LIFETIME_MS': (
            app.permanent_session_lifetime.seconds * 1000),
    }


# models ###############################################################
class User(UserMixin):
    def __init__(self, id):
        self.id = id


# forms ###############################################################
class LoginForm(Form):
    username = StringField()
    password = PasswordField()


# views ###############################################################
@app.route("/login", methods=['GET', 'POST'])
def login():
    form = LoginForm(request.form)
    message = ''

    if request.method == 'POST' and form.validate():
        db_user = TOY_USER_DATABASE.get(form.username.data)
        if form.password.data == db_user.password:
            user = User(db_user.id)
            login_user(user)
            return redirect(url_for('.home'))
        else:
            message = 'Login failed.'

    context = {
        'form': form,
        'message': message,
    }
    return render_template('login.html', **context)


@app.route("/")
@login_required
def home():
    return render_template('home.html')


@app.route("/another-page")
@login_required
def another_page():
    return render_template('another_page.html')


@app.route("/logout")
@login_required
def logout():
    logout_user()
    return redirect(url_for('.logged_out') + '?' + request.query_string)


@app.route("/logged-out")
def logged_out():
    timed_out = request.args.get('timeout')
    return render_template('logged_out.html', timed_out=timed_out)


@app.route("/ping", methods=['POST'])
def ping():
    session.modified = True
    return 'OK'


if __name__ == "__main__":
    app.run(debug=True)

The full example is also on github at: https://github.com/saltycrane/session-timeout-example.

Initial ideas were taken from http://www.itworld.com/development/335546/how-create-session-timeout-warning-your-web-application-using-jquery but, from what I could tell, it pinged the server whether the user was active or not.

The old "%" string formatting and the new string .format() method handle unicode differently

Today I learned that the old style "%" string formatting and the new string .format() method behave differently when interpolating unicode strings. I was suprised to find out that one of these lines raised an error while one did not:

'%s' % u'O\u2019Connor'
'{}'.format(u'O\u2019Connor')

The old style "%" formatting operation returns a unicode string if one of the values is a unicode string even when the format string is a non-unicode string:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> '%s' % u'O\u2019Connor'
u'O\u2019Connor'

The new string .format() method called on a non-unicode string with a unicode string argument tries to encode the unicode string to a non-unicode string (bytestring) possibly raising a UnicodeEncodeError:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> '{}'.format(u'O\u2019Connor')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 1: ordinal not in range(128)</module></stdin>

I guess the correct thing to do is to start with a unicode format string:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> u'{}'.format(u'O\u2019Connor')
u'O\u2019Connor'

See also

Python logging filters do not propagate like handlers and levels do

Loggers are organized in a hierarchical fashion. A logger named 'foo.bar' is a child of a logger named 'foo'.

getLogger() returns a reference to a logger instance with the specified name if it is provided, or root if not. The names are period-separated hierarchical structures. Multiple calls to getLogger() with the same name will return a reference to the same logger object. Loggers that are further down in the hierarchical list are children of loggers higher up in the list. For example, given a logger with a name of foo, loggers with names of foo.bar, foo.bar.baz, and foo.bam are all descendants of foo. - Loggers documentation

If the level is not set on a logger, the level of the parent is used.

Loggers have a concept of effective level. If a level is not explicitly set on a logger, the level of its parent is used instead as its effective level. If the parent has no explicit level set, its parent is examined, and so on - all ancestors are searched until an explicitly set level is found. The root logger always has an explicit level set (WARNING by default). When deciding whether to process an event, the effective level of the logger is used to determine whether the event is passed to the logger’s handlers. - Loggers documentation

import logging

foo_logger = logging.getLogger('foo')
foo_logger.setLevel(20)

foo_bar_logger = logging.getLogger('foo.bar')

print foo_logger.getEffectiveLevel()
print foo_bar_logger.getEffectiveLevel()
20
20

Similarly, if a handler is not defined for a logger, the handler of the parent is used.

Child loggers propagate messages up to the handlers associated with their ancestor loggers. Because of this, it is unnecessary to define and configure handlers for all the loggers an application uses. It is sufficient to configure handlers for a top-level logger and create child loggers as needed. (You can, however, turn off propagation by setting the propagate attribute of a logger to False.) - Loggers documentation

import logging

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)

foo_logger = logging.getLogger('foo')
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: 40 foo - asdfasdf
MY HANDLER: 40 foo.bar - zxcvzxcv

However, filters, unlike levels and handlers, do not propagate. If a filter is not defined for a logger, the filter of the parent is NOT used.

Note that filters attached to handlers are consulted before an event is emitted by the handler, whereas filters attached to loggers are consulted whenever an event is logged (using debug(), info(), etc.), before sending an event to handlers. This means that events which have been generated by descendant loggers will not be filtered by a logger’s filter setting, unless the filter has also been applied to those descendant loggers. - Filter Objects documentation
See also the logging flowchart.

import logging

class MyFilter(logging.Filter):
    def filter(self, record):
        record.msg = 'MY FILTER: ' + record.msg
        return 1

myfilter = MyFilter()

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)

foo_logger = logging.getLogger('foo')
foo_logger.addFilter(myfilter)
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: foo - MY FILTER: asdfasdf
MY HANDLER: foo.bar - zxcvzxcv

I guess I'll attach it to the handler instead...

import logging

class MyFilter(logging.Filter):
    def filter(self, record):
        record.msg = 'MY FILTER: ' + record.msg
        return 1

myfilter = MyFilter()

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)
myhandler.addFilter(myfilter)

foo_logger = logging.getLogger('foo')
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: foo - MY FILTER: asdfasdf
MY HANDLER: foo.bar - MY FILTER: zxcvzxcv

Subdomain-based configuration for a Flask local development server

This example shows how to set up a Flask local development server to use a different configuration based on the subdomain of the request. The project I work on has several environments (dev, qa, staging, etc). Each environment has different database and API hostnames. I use this to switch between database and API environments quickly while using my local development server.

This assumes a create_app function is used to create the Flask application instance as described in the Application Factories Flask documentation.

create_app

Modify the create_app function to take a configobj argument and use it to override the default configuration.

def create_app(configobj=None):
    app = Flask(__name__)

    # Default configuration
    app.config.from_object(__name__)

    # Override configuration using config passed into create_app
    if configobj:
        app.config.from_object(configobj)

    return app

SubdomainDispatcher

The SubdomainDispatcher is taken from the Application Dispatching Flask documentation. It is WSGI middleware that looks at the subdomain of the request and returns a different application instance for each subdomain. It calls the create_app function above and passes it the appropriate configuration object for the subdomain.

class SubdomainDispatcher(object):

    def __init__(self, create_app, domain=''):
        """
        :param create_app: a function that returns a `flask.Flask` instance
        :param domain: str - used to determine the subdomain
        """
        self.create_app = create_app
        self.domain = domain
        self.lock = Lock()
        self.instances = {}

    def __call__(self, environ, start_response):
        app = self._get_application(environ['HTTP_HOST'])
        return app(environ, start_response)

    def _get_application(self, host):
        host = host.split(':')[0]
        assert host.endswith(self.domain), 'Configuration error'
        subdomain = host[:-len(self.domain)].rstrip('.')
        with self.lock:
            app = self.instances.get(subdomain)
            if app is None:
                configobj = self._get_subdomain_based_config(subdomain)
                app = self.create_app(configobj=configobj)
                self.instances[subdomain] = app
            return app

    @staticmethod
    def _get_subdomain_based_config(subdomain):

        class Config(object):
            pass
        config = Config()

        if subdomain == 'dev':
            config.API_HOST = 'dev-host'
            config.DB_SERVER = 'dev-db-server'
        elif subdomain == 'qa':
            config.API_HOST = 'qa-host'
            config.DB_SERVER = 'qa-db-server'

        return config

rundevserver

rundevserver is similar to flask.Flask.run but uses the SubdomainDispatcher middleware before calling werkzeug.serving.run_simple.

def rundevserver(host=None, port=None, domain='', debug=True, **options):
    """
    Modified from `flask.Flask.run`

    Runs the application on a local development server.

    :param host: the hostname to listen on. Set this to ``'0.0.0.0'`` to
                 have the server available externally as well. Defaults to
                 ``'127.0.0.1'``.
    :param port: the port of the webserver. Defaults to ``5000``
    :param domain: used to determine the subdomain
    :param debug: if given, enable or disable debug mode.
                  See :attr:`debug`.
    :param options: the options to be forwarded to the underlying
                    Werkzeug server. See
                    :func:`werkzeug.serving.run_simple` for more
                    information.
    """
    from werkzeug.serving import run_simple

    if host is None:
        host = '127.0.0.1'
    if port is None:
        port = 5000
    options.setdefault('use_reloader', debug)
    options.setdefault('use_debugger', debug)

    app = SubdomainDispatcher(create_app, domain, debug=debug)

    run_simple(host, port, app, **options)

Usage

  1. Add the following to your hosts file (/etc/hosts on Ubuntu):
    0.0.0.0 dev.localhost
    0.0.0.0 qa.localhost
  2. Run the local dev server:
    if __name__ == '__main__':
        rundevserver(host='0.0.0.0', port=5000, domain='localhost')
    
  3. Use the following URLs to get different app configurations:

Github code

A full working example is located on github: flask-subdomain-dispatcher-example An updated version of the code is here: flask-subdomaindevserver.

How to add a margin around markers in the Google Static Maps API using Python

This example shows how to use Python to generate a Google Static Map URL for a map that contains markers within some dimensions which are smaller than the map image dimensions. This effectively allows for setting minimum X and Y margins around the markers in a map. This is useful for a "fluid" web design where a maximum map size is requested from Google and is then cut off at the edges for small browser windows.

The bulk of this solution is based on the Javascript code here: http://stackoverflow.com/questions/6048975/google-maps-v3-how-to-calculate-the-zoom-level-for-a-given-bounds

import math


def generate_map_url(
        min_map_width_px,
        max_map_width_px,
        min_map_height_px,
        max_map_height_px,
        marker_groups):
    """
    Return a Google Static Map URL for a map that contains markers within
    some dimensions which are smaller than the map image dimensions. This
    effectively allows for setting minimum X and Y margins around the markers
    in a map. This is useful for a "fluid" web design where a maximum map
    size is requested from Google and is then cut off at the edges for
    small browser windows.
    """
    # Determine the maximum zoom to contain markers at the minimum map size
    lat_list = [
        lat for markers in marker_groups for lat, lng in markers['lat_lng']]
    lng_list = [
        lng for markers in marker_groups for lat, lng in markers['lat_lng']]
    max_zoom = get_zoom_to_fit(
        min(lat_list), max(lat_list), min(lng_list), max(lng_list),
        min_map_width_px, min_map_height_px,
    )

    # Build the markers query string arguments
    markers_args = ''
    for markers in marker_groups:
        lat_lng = '|'.join([
            '{},{}'.format(lat, lng) for lat, lng in markers['lat_lng']])
        markers_args += '&markers;=color:{}|{}'.format(markers['color'], lat_lng)

    # Build and return the map URL
    return ''.join([
        'http://maps.googleapis.com/maps/api/staticmap',
        '?sensor=false&v;=3&visual;_refresh=true',
        '&size;={}x{}&zoom;={}'.format(
            max_map_width_px, max_map_height_px, max_zoom),
        markers_args,
    ])


def get_zoom_to_fit(min_lat, max_lat, min_lng, max_lng, width_px, height_px):
    """
    Return the maximum zoom that will fit the given min/max lat/lng
    coordinates in a map of the given dimensions. This is used to
    override the zoom set by Google's implicit positioning.

    Calculation translated from Javascript to Python from:
    http://stackoverflow.com/questions/6048975/google-maps-v3-how-to-calculate-the-zoom-level-for-a-given-bounds
    """
    GOOGLE_WORLD_WIDTH = 256
    GOOGLE_WORLD_HEIGHT = 256
    MAX_ZOOM = 17

    def lat2rad(lat):
        sinlat = math.sin(math.radians(lat))
        radx2 = math.log((1 + sinlat) / (1 - sinlat)) / 2.0
        return max(min(radx2, math.pi), -math.pi) / 2.0

    def zoom(map_px, world_px, fraction):
        # Use int() to round down to the nearest integer
        return int(
            math.log(float(map_px) / float(world_px) / fraction)
            / math.log(2.0)
        )

    # Determine the maximum zoom based on height and latitude
    if min_lat == max_lat:
        lat_zoom = MAX_ZOOM
    else:
        lat_fraction = (lat2rad(max_lat) - lat2rad(min_lat)) / math.pi
        lat_zoom = zoom(height_px, GOOGLE_WORLD_HEIGHT, lat_fraction)

    # Determine the maximum zoom based on width and longitude
    if min_lng == max_lng:
        lng_zoom = MAX_ZOOM
    else:
        lng_range = max_lng - min_lng
        if lng_range < 0:
            lng_range += 360.0
        lng_fraction = lng_range / 360.0
        lng_zoom = zoom(width_px, GOOGLE_WORLD_WIDTH, lng_fraction)

    return min(lat_zoom, lng_zoom, MAX_ZOOM)

Here is an example:

map_url = generate_map_url(
    min_map_width_px=240, max_map_width_px=380,
    min_map_height_px=285, max_map_height_px=325,
    marker_groups=[
        {'color': 'blue',
         'lat_lng': [(34.0993, -118.8394)]},
        {'color': 'orange',
         'lat_lng': [
             (34.3997, -119.2002),
             (34.5389, -118.4499),
             (34.0983, -118.1285),
             (33.5932, -117.9455),
             (33.8322, -117.3958),
         ]}
    ]
)
print map_url

Here is a map without the margin: http://maps.googleapis.com/maps/api/staticmap?sensor=false&v;=3&visual;_refresh=true&size;=380x325&markers;=color:blue|34.0993,-118.8394&markers;=color:orange|34.3997,-119.2002|34.5389,-118.4499|34.0983,-118.1285|33.5932,-117.9455|33.8322,-117.3958

Here is the result with the margin: http://maps.googleapis.com/maps/api/staticmap?sensor=false&v;=3&visual;_refresh=true&size;=380x325&zoom;=7&markers;=color:blue|34.0993,-118.8394&markers;=color:orange|34.3997,-119.2002|34.5389,-118.4499|34.0983,-118.1285|33.5932,-117.9455|33.8322,-117.3958

Do you have a lot of short, single-use, private functions in your Python code?

Do you have a lot of short, single-use, private functions in your Python code? For example, below is some stubbed out authentication code I've been working on. It checks if a user's password is correct and updates the hash algorithm to use bcrypt. The 4 private functions with the leading underscore are from 1 to 10 lines long and are only used by the check_password function. These functions are part of a larger module with about 20 functions. I don't like that these 4 functions add clutter to the module and are not grouped with the function that uses them, check_password.

def _get_password_hash_from_db(email_address):
    """Get the user's password hash from the database.
    """


def _determine_password_hash_algorithm(password_hash):
    """Determine the hash algorithm.
    """


def _hash_password_old(password):
    """This is the OLD password hash algorithm.
    """


def _hash_existing_password_bcrypt(password, db_password_hash):
    """This is the NEW algorithm used for hashing existing passwords.
    """


def check_password(email_address, password):
    """Check if a user's supplied password is correct.
    """
    db_password_hash = _get_password_hash_from_db(email_address)
    hash_alg = _determine_password_hash_algorithm(db_password_hash)
    if hash_alg == 'BCRYPT':
        input_password_hash = _hash_existing_password_bcrypt(password, db_password_hash)
    else:
        input_password_hash = _hash_password_old(password)
    password_correct = (input_password_hash == db_password_hash)
    if password_correct and hash_alg != 'BCRYPT':
        call_change_password(email_address, password)
    return password_correct


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Sometimes, in cases like this, I move the 4 private functions to be nested functions inside check_password. I like how the functions are grouped together and that the module is not littered with extraneous functions. However, the inner functions are not easily testable and I don't see many people doing this.

def check_password(email_address, password):
    """Check if a user's supplied password is correct.
    """

    def get_password_hash_from_db(email_address):
        """Get the user's password hash from the database.
        """

    def determine_password_hash_algorithm(password_hash):
        """Determine the hash algorithm.
        """

    def hash_password_old(password):
        """This is the OLD password hash algorithm.
        """

    def hash_existing_password_bcrypt(password, db_password_hash):
        """This is the NEW algorithm used for hashing existing passwords.
        """

    db_password_hash = get_password_hash_from_db(email_address)
    hash_alg = determine_password_hash_algorithm(db_password_hash)
    if hash_alg == 'BCRYPT':
        input_password_hash = hash_existing_password_bcrypt(password, db_password_hash)
    else:
        input_password_hash = hash_password_old(password)
    password_correct = (input_password_hash == db_password_hash)
    if password_correct and hash_alg != 'BCRYPT':
        call_change_password(email_address, password)
    return password_correct


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Another option is to create a PasswordChecker class instead. This seems the most powerful and now the private methods are testable. However, this adds more overhead and I hear Jack Diederich telling me to Stop Writing Classes!

class _PasswordChecker(object):
    """Check if a user's supplied password is correct.
    """

    @staticmethod
    def _get_password_hash_from_db(email_address):
        """Get the user's password hash from the database.
        """

    @staticmethod
    def _determine_password_hash_algorithm(password_hash):
        """Determine the hash algorithm.
        """

    @staticmethod
    def _hash_password_old(password):
        """This is the OLD password hash algorithm.
        """

    @staticmethod
    def _hash_existing_password_bcrypt(password, db_password_hash):
        """This is the NEW algorithm used for hashing existing passwords.
        """

    def __call__(self, email_address, password):
        db_password_hash = self._get_password_hash_from_db(email_address)
        hash_alg = self._determine_password_hash_algorithm(db_password_hash)
        if hash_alg == 'BCRYPT':
            input_password_hash = self._hash_existing_password_bcrypt(password, db_password_hash)
        else:
            input_password_hash = self._hash_password_old(password)
        password_correct = (input_password_hash == db_password_hash)
        if password_correct and hash_alg != 'BCRYPT':
            call_change_password(email_address, password)
        return password_correct


check_password = _PasswordChecker()


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Maybe the solution is to break up the module into smaller modules which act like the class above? However this might leave me with some unevenly sized modules. How do you handle this?

How to expose a Flask local development server to the public using SSH remote port forwarding

Here is how to run a Flask local development server on your local machine and expose it to the public via a remote server you have control over. This uses SSH remote port forwarding which is a converse of local port forwarding described here: How to run a Django local development server on a remote machine and access it in your browser on your local machine using SSH port forwarding

  1. On the remote host, edit the sshd_config file (mine was located at /etc/ssh/sshd_config) to allow remote hosts to connect to ports forwarded for the client:
    GatewayPorts yes
  2. On the remote host, restart the SSH server:
    $ sudo service sshd restart 
    
  3. On the local host, SSH to the remote host:
    $ ssh -v -R 50051:localhost:5000 eliot@my.remotehost.com 
    
  4. On the local host, run the Flask dev server:
    $ python runserver.py localhost 5000 
    
  5. Go to http://my.remotehost.com:50051 in the browser

Using RemoteForward in your ~/.ssh/config

You can also achieve the same results by using the RemoteForward in your ~/.ssh/config file:

Host myremote
  User eliot
  HostName my.remotehost.com
  RemoteForward 50051 localhost:5000

References

See also

localtunnel by Jeff Lindsay exposes your local development server without requiring a public remote server.

When is the try-finally block used in Python?

The finally block is used to define clean-up actions. Why is the finally block needed? Why can't the clean up actions be put after the try/except/else block? This works in some cases, but if there is a return, break, or continue, or an unhandled exception inside the try, except, or else clauses, that code will never be executed. The finally block executes even in these conditions.

try:
    print 'Inside try'
    raise Exception
finally:
    print 'Inside finally'
print 'Never get here'

Results:

Inside try
Inside finally
Traceback (most recent call last):
  File "tmp.py", line 13, in 
    raise Exception
Exception

Reference: http://docs.python.org/2/tutorial/errors.html#defining-clean-up-actions

Using Python's gzip and StringIO to compress data in memory

I needed to gzip some data in memory that would eventually end up saved to disk as a .gz file. I thought, That's easy, just use Python's built in gzip module.

However, I needed to pass the data to pycurl as a file-like object. I didn't want to write the data to disk and then read it again just to pass to pycurl. I thought, That's easy also-- just use Python's cStringIO module.

The solution did end up being simple, but figuring out the solution was a lot harder than I thought. Below is my roundabout process of finding the simple solution.

Here is my setup/test code. I am running Python 2.7.3 on Ubuntu 12.04.

import cStringIO
import gzip


STUFF_TO_GZIP = """Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?"""
FILENAME = 'myfile.json.gz'


def pycurl_simulator(fileobj):

    # Get the file size
    fileobj.seek(0, 2)
    filesize = fileobj.tell()
    fileobj.seek(0, 0)

    # Read the file data
    fout = open(FILENAME, 'wb')
    fout.write(fileobj.read())
    fout.close()

    return filesize

Try 1: seek from the end fails

Here is my first attempt using cStringIO with the gzip module.

def try1_seek_from_end_fails():

    ftemp = cStringIO.StringIO()
    fgzipped = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=ftemp)
    fgzipped.write(STUFF_TO_GZIP)
    filesize = pycurl_simulator(fgzipped)
    print filesize

I got this exception:

        Traceback (most recent call last):
          File "tmp.py", line 232, in <module>
            try1_seek_from_end_fails()
          File "tmp.py", line 83, in try1_seek_from_end_fails
            filesize = pycurl_simulator(fgzipped)
          File "tmp.py", line 25, in pycurl_simulator
            fileobj.seek(0, 2)
          File "/usr/lib/python2.7/gzip.py", line 415, in seek
            raise ValueError('Seek from end not supported')
        ValueError: Seek from end not supported

It turns out the gzip object doesn't support seeking from the end. See this thread on the Python mailing list: http://mail.python.org/pipermail/python-list/2009-January/519398.html

Try 2: data is not compressed

What if we don't seek() from the end and just tell() where we are? (It should be at the end after doing a write(), right?) Unfortunately, this gave me the uncompressed size.

Reading from the GzipFile object also gave me an error saying that I couldn't read from a writable object.

def try2_data_is_not_compressed():

    ftemp = cStringIO.StringIO()
    fgzipped = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=ftemp)
    fgzipped.write(STUFF_TO_GZIP)
    filesize = fgzipped.tell()
    print filesize

Try 5: file much too small

I googled, then looked at the source code for gzip.py. I found that the compressed data was in the StringIO object. So I performed my file operations on it instead of the GzipFile object. Now I was able to write the data out to a file. However, the size of the file was much too small.

def try5_file_much_too_small():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    filesize = pycurl_simulator(fgz)
    print filesize

Try 6: unexpected end of file

I saw there was a flush() method in the source code. I added a call to flush(). This time, I got a reasonable file size, however, when trying to gunzip it from the command line, I got the following error:

        gzip: myfile.json.gz: unexpected end of file
def try6_unexpected_end_of_file():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.flush()
    filesize = pycurl_simulator(fgz)
    print filesize

Try 7: got it working

I knew that GzipFile worked properly when writing files directly as opposed to reading from the StringIO object. It turns out the difference was that there was code in the close() method of GzipFile which wrote some extra required data. Now stuff was working.

def try7_got_it_working():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.flush()

    # Do stuff that GzipFile.close() does
    gzip_obj.fileobj.write(gzip_obj.compress.flush())
    gzip.write32u(gzip_obj.fileobj, gzip_obj.crc)
    gzip.write32u(gzip_obj.fileobj, gzip_obj.size & 0xffffffffL)

    filesize = pycurl_simulator(fgz)
    print filesize

Try 8: (not really) final version

Here's the (not really) final version using a subclass of GzipFile that adds a method to write the extra data at the end. If also overrides close() so that stuff isn't written twice in case you need to use close(). Also, the separate flush() call is not needed.

def try8_not_really_final_version():

    class MemoryGzipFile(gzip.GzipFile):
        """
        A GzipFile subclass designed to be used with in memory file like
        objects, i.e. StringIO objects.
        """

        def write_crc_and_filesize(self):
            """
            Flush and write the CRC and filesize. Normally this is done
            in the close() method. However, for in memory file objects,
            doing this in close() is too late.
            """
            self.fileobj.write(self.compress.flush())
            gzip.write32u(self.fileobj, self.crc)
            # self.size may exceed 2GB, or even 4GB
            gzip.write32u(self.fileobj, self.size & 0xffffffffL)

        def close(self):
            if self.fileobj is None:
                return
            self.fileobj = None
            if self.myfileobj:
                self.myfileobj.close()
                self.myfileobj = None

    fgz = cStringIO.StringIO()
    gzip_obj = MemoryGzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.write_crc_and_filesize()

    filesize = pycurl_simulator(fgz)
    print filesize

Try 9: didn't need to do that (final version)

It turns out I can close the GzipFile object and the StringIO object remains available. So that MemoryGzipFile class above is completely unnecessary. I am dumb. Here is the final iteration:

def try9_didnt_need_to_do_that():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.close()

    filesize = pycurl_simulator(fgz)
    print filesize

References

Here is some googling I did:

How to start a long-running process in screen and detach from it

How to start a long-running process in screen, detach from it, and reattach to it later.

Start a long running process in screen and detach

  • Ssh to the remote host, myremote:
    eliot@mylocal:~$ ssh myremote 
    
  • Start a new screen session
    eliot@myremote:~$ screen 
    
  • Start a long running process, "sleep 3600":
    eliot@myremote:~$ sleep 3600 
    
  • Detach from the screen session:
    eliot@myremote:~$ CTRL-A : detach ENTER 
    
    (Hit [CTRL-A], then type a colon character, then type "detach", then hit [ENTER])
  • Exit your remote SSH session:
    eliot@myremote:~$ exit 
    

Reattach to the existing screen session

  • Ssh to the remote host again:
    eliot@mylocal:~$ ssh myremote 
    
  • List your active screen sessions:
    eliot@myremote:~$ screen -ls 
    There is a screen on:
    	11518.pts-1.myremote	(Detached)
    1 Socket in /var/run/screen/S-eliot.
    
  • Reattach to your screen session:
    eliot@myremote:~$ screen -RD 
    
    Note: you don't actually have to use the -RD option. You could use -rD or -r. But I just use -RD all the time. If there is more than one screen session active you will have to say: screen -RD 11518.pts-1.myremote or whichever screen session you want to attach to.
  • It will show you the "sleep 3600" command running. To exit, CTRL-C the sleep process, type "exit" to exit the screen session, and "exit" again to exit the SSH session.

See also