SaltyCrane Blog —  Notes on Python and web development on Ubuntu Linux

Ordered a Lenovo X1 Carbon for 40% off

I ordered a ~$1200 2.8 lb. 14" Lenovo Thinkpad X1 Carbon (2014 2nd generation) today at 40% off for the Black Friday / Cyber Monday sale. I hope I like the keyboard. I splurged for an Intel Core i7-4600U, 8GB RAM, 256GB SSD, and the WQHD 2560x1440 display. I have enjoyed 7 months with my ~$200 2.8 lb. 11" Acer C720 Chromebook. It's a pleasant contrast to my work-issued 5.9 lb. 17" Lenovo Thinkpad W510. Crouton worked well for running Linux on the Chromebook but I wanted a dedicated Linux laptop and a better screen. I will bequeath the C720 to my wife since she currently uses my 7+ year old Dell Inspiron E1405. I considered the 3.5 lb. 14" Lenovo Thinkpad T440s due to better battery/keyboard/flexibility, but the X1 Carbon was ~$200 cheaper for a similarly spec'd model due to a bigger discount and I liked the thinner and lighter design and better display. Many reviews complained about the new keyboard layout and adaptive function row. I hope key remapping will reduce the pain enough. LWN.net editor, Jonathan Corbet mentioned he bought an X1 Carbon in High-DPI displays and Linux, so it can't be too bad, right?

How to install grunt on Ubuntu 14.04

Grunt is a Javascript task runner that can be used to compile Sass, run JSHint, or run many other plugins. It depends on the Node.js package manager, npm.

If you use the standard Ubuntu Trusty Tahr repository to install nodejs/npm, you will get this error when running grunt:

/usr/bin/env: node: No such file or directory
Instead, use the chris-lea nodejs repository.

Install nodejs, npm, and grunt-cli

$ sudo add-apt-repository ppa:chris-lea/node.js 
$ sudo apt-get update 
$ sudo apt-get install nodejs 
$ sudo npm install -g grunt-cli 

Install grunt in your project directory

$ cd ~/myproject 
$ echo "{}" > package.json 
$ npm install grunt --save-dev 

Verify grunt is installed

$ nodejs --version 
v0.10.33
$ npm --version 
1.4.28
$ grunt --version 
grunt-cli v0.1.13
grunt v0.4.5

Run a simple grunt task

  1. $ cd ~/myproject
    
  2. Create a package.json file:
    {
      "name": "my-project-name",
      "version": "0.1.0",
      "devDependencies": {
        "grunt": "~0.4.5",
        "grunt-contrib-uglify": "~0.5.0"
      }
    }
  3. Install grunt-contrib-uglify
    $ npm install 
    npm WARN package.json my-project-name@0.1.0 No description
    npm WARN package.json my-project-name@0.1.0 No repository field.
    npm WARN package.json my-project-name@0.1.0 No README data
    grunt-contrib-uglify@0.5.1 node_modules/grunt-contrib-uglify
    ├── chalk@0.5.1 (ansi-styles@1.1.0, escape-string-regexp@1.0.2, supports-color@0.2.0, strip-ansi@0.3.0, has-ansi@0.1.0)
    ├── lodash@2.4.1
    ├── maxmin@0.2.2 (figures@1.3.5, pretty-bytes@0.1.2, gzip-size@0.2.0)
    └── uglify-js@2.4.15 (uglify-to-browserify@1.0.2, async@0.2.10, optimist@0.3.7, source-map@0.1.34)
    
  4. Get an example unminified JS file:
    $ wget http://code.jquery.com/jquery-2.1.1.js 
    --2014-11-22 00:47:31--  http://code.jquery.com/jquery-2.1.1.js
    Resolving code.jquery.com (code.jquery.com)... 94.31.29.53, 94.31.29.230
    Connecting to code.jquery.com (code.jquery.com)|94.31.29.53|:80... connected.
    HTTP request sent, awaiting response... 200 OK
    Length: 247351 (242K) [application/x-javascript]
    Saving to: ‘jquery-2.1.1.js’
    
    100%[================================================================================================================>] 247,351     --.-K/s   in 0.1s    
    
    2014-11-22 00:47:31 (1.71 MB/s) - ‘jquery-2.1.1.js’ saved [247351/247351]
    
  5. Create a Gruntfile.js file:
    module.exports = function(grunt) {
      grunt.initConfig({
        pkg: grunt.file.readJSON('package.json'),
        uglify: {
          build: {
            src: 'jquery-2.1.1.js',
            dest: 'jquery-2.1.1.min.js'
          }
        }
      });
      grunt.loadNpmTasks('grunt-contrib-uglify');
      grunt.registerTask('default', ['uglify']);
    };
  6. Run the grunt task:
    $ grunt 
    Running "uglify:build" (uglify) task
    
    Done, without errors.
    
  7. You should now have a minified file, jquery-2.1.1.min.js
    $ ls -gG jquery* 
    -rw-rw-r-- 1 247351 2014 10/23 17:16 jquery-2.1.1.js
    -rw-rw-r-- 1  84113 2014 11/22 00:48 jquery-2.1.1.min.js
    

References

The old "%" string formatting and the new string .format() method handle unicode differently

Today I learned that the old style "%" string formatting and the new string .format() method behave differently when interpolating unicode strings. I was suprised to find out that one of these lines raised an error while one did not:

'%s' % u'O\u2019Connor'
'{}'.format(u'O\u2019Connor')

The old style "%" formatting operation returns a unicode string if one of the values is a unicode string even when the format string is a non-unicode string:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> '%s' % u'O\u2019Connor'
u'O\u2019Connor'

The new string .format() method called on a non-unicode string with a unicode string argument tries to encode the unicode string to a non-unicode string (bytestring) possibly raising a UnicodeEncodeError:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> '{}'.format(u'O\u2019Connor')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 1: ordinal not in range(128)</module></stdin>

I guess the correct thing to do is to start with a unicode format string:

Python 2.7.3 (default, Feb 27 2014, 19:58:35) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> u'{}'.format(u'O\u2019Connor')
u'O\u2019Connor'

See also

Python logging filters do not propagate like handlers and levels do

Loggers are organized in a hierarchical fashion. A logger named 'foo.bar' is a child of a logger named 'foo'.

getLogger() returns a reference to a logger instance with the specified name if it is provided, or root if not. The names are period-separated hierarchical structures. Multiple calls to getLogger() with the same name will return a reference to the same logger object. Loggers that are further down in the hierarchical list are children of loggers higher up in the list. For example, given a logger with a name of foo, loggers with names of foo.bar, foo.bar.baz, and foo.bam are all descendants of foo. - Loggers documentation

If the level is not set on a logger, the level of the parent is used.

Loggers have a concept of effective level. If a level is not explicitly set on a logger, the level of its parent is used instead as its effective level. If the parent has no explicit level set, its parent is examined, and so on - all ancestors are searched until an explicitly set level is found. The root logger always has an explicit level set (WARNING by default). When deciding whether to process an event, the effective level of the logger is used to determine whether the event is passed to the logger’s handlers. - Loggers documentation

import logging

foo_logger = logging.getLogger('foo')
foo_logger.setLevel(20)

foo_bar_logger = logging.getLogger('foo.bar')

print foo_logger.getEffectiveLevel()
print foo_bar_logger.getEffectiveLevel()
20
20

Similarly, if a handler is not defined for a logger, the handler of the parent is used.

Child loggers propagate messages up to the handlers associated with their ancestor loggers. Because of this, it is unnecessary to define and configure handlers for all the loggers an application uses. It is sufficient to configure handlers for a top-level logger and create child loggers as needed. (You can, however, turn off propagation by setting the propagate attribute of a logger to False.) - Loggers documentation

import logging

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)

foo_logger = logging.getLogger('foo')
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: 40 foo - asdfasdf
MY HANDLER: 40 foo.bar - zxcvzxcv

However, filters, unlike levels and handlers, do not propagate. If a filter is not defined for a logger, the filter of the parent is NOT used.

Note that filters attached to handlers are consulted before an event is emitted by the handler, whereas filters attached to loggers are consulted whenever an event is logged (using debug(), info(), etc.), before sending an event to handlers. This means that events which have been generated by descendant loggers will not be filtered by a logger’s filter setting, unless the filter has also been applied to those descendant loggers. - Filter Objects documentation
See also the logging flowchart.

import logging

class MyFilter(logging.Filter):
    def filter(self, record):
        record.msg = 'MY FILTER: ' + record.msg
        return 1

myfilter = MyFilter()

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)

foo_logger = logging.getLogger('foo')
foo_logger.addFilter(myfilter)
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: foo - MY FILTER: asdfasdf
MY HANDLER: foo.bar - zxcvzxcv

I guess I'll attach it to the handler instead...

import logging

class MyFilter(logging.Filter):
    def filter(self, record):
        record.msg = 'MY FILTER: ' + record.msg
        return 1

myfilter = MyFilter()

myformatter = logging.Formatter("MY HANDLER: %(name)s - %(message)s")

myhandler = logging.StreamHandler()
myhandler.setFormatter(myformatter)
myhandler.addFilter(myfilter)

foo_logger = logging.getLogger('foo')
foo_logger.addHandler(myhandler)

foo_bar_logger = logging.getLogger('foo.bar')

foo_logger.error('asdfasdf')
foo_bar_logger.error('zxcvzxcv')
MY HANDLER: foo - MY FILTER: asdfasdf
MY HANDLER: foo.bar - MY FILTER: zxcvzxcv

Subdomain-based configuration for a Flask local development server

This example shows how to set up a Flask local development server to use a different configuration based on the subdomain of the request. The project I work on has several environments (dev, qa, staging, etc). Each environment has different database and API hostnames. I use this to switch between database and API environments quickly while using my local development server.

This assumes a create_app function is used to create the Flask application instance as described in the Application Factories Flask documentation.

create_app

Modify the create_app function to take a configobj argument and use it to override the default configuration.

def create_app(configobj=None):
    app = Flask(__name__)

    # Default configuration
    app.config.from_object(__name__)

    # Override configuration using config passed into create_app
    if configobj:
        app.config.from_object(configobj)

    return app

SubdomainDispatcher

The SubdomainDispatcher is taken from the Application Dispatching Flask documentation. It is WSGI middleware that looks at the subdomain of the request and returns a different application instance for each subdomain. It calls the create_app function above and passes it the appropriate configuration object for the subdomain.

class SubdomainDispatcher(object):

    def __init__(self, create_app, domain=''):
        """
        :param create_app: a function that returns a `flask.Flask` instance
        :param domain: str - used to determine the subdomain
        """
        self.create_app = create_app
        self.domain = domain
        self.lock = Lock()
        self.instances = {}

    def __call__(self, environ, start_response):
        app = self._get_application(environ['HTTP_HOST'])
        return app(environ, start_response)

    def _get_application(self, host):
        host = host.split(':')[0]
        assert host.endswith(self.domain), 'Configuration error'
        subdomain = host[:-len(self.domain)].rstrip('.')
        with self.lock:
            app = self.instances.get(subdomain)
            if app is None:
                configobj = self._get_subdomain_based_config(subdomain)
                app = self.create_app(configobj=configobj)
                self.instances[subdomain] = app
            return app

    @staticmethod
    def _get_subdomain_based_config(subdomain):

        class Config(object):
            pass
        config = Config()

        if subdomain == 'dev':
            config.API_HOST = 'dev-host'
            config.DB_SERVER = 'dev-db-server'
        elif subdomain == 'qa':
            config.API_HOST = 'qa-host'
            config.DB_SERVER = 'qa-db-server'

        return config

rundevserver

rundevserver is similar to flask.Flask.run but uses the SubdomainDispatcher middleware before calling werkzeug.serving.run_simple.

def rundevserver(host=None, port=None, domain='', debug=True, **options):
    """
    Modified from `flask.Flask.run`

    Runs the application on a local development server.

    :param host: the hostname to listen on. Set this to ``'0.0.0.0'`` to
                 have the server available externally as well. Defaults to
                 ``'127.0.0.1'``.
    :param port: the port of the webserver. Defaults to ``5000``
    :param domain: used to determine the subdomain
    :param debug: if given, enable or disable debug mode.
                  See :attr:`debug`.
    :param options: the options to be forwarded to the underlying
                    Werkzeug server. See
                    :func:`werkzeug.serving.run_simple` for more
                    information.
    """
    from werkzeug.serving import run_simple

    if host is None:
        host = '127.0.0.1'
    if port is None:
        port = 5000
    options.setdefault('use_reloader', debug)
    options.setdefault('use_debugger', debug)

    app = SubdomainDispatcher(create_app, domain, debug=debug)

    run_simple(host, port, app, **options)

Usage

  1. Add the following to your hosts file (/etc/hosts on Ubuntu):
    0.0.0.0 dev.localhost
    0.0.0.0 qa.localhost
  2. Run the local dev server:
    if __name__ == '__main__':
        rundevserver(host='0.0.0.0', port=5000, domain='localhost')
    
  3. Use the following URLs to get different app configurations:

Github code

A full working example is located on github: flask-subdomain-dispatcher-example

How to add a margin around markers in the Google Static Maps API using Python

This example shows how to use Python to generate a Google Static Map URL for a map that contains markers within some dimensions which are smaller than the map image dimensions. This effectively allows for setting minimum X and Y margins around the markers in a map. This is useful for a "fluid" web design where a maximum map size is requested from Google and is then cut off at the edges for small browser windows.

The bulk of this solution is based on the Javascript code here: http://stackoverflow.com/questions/6048975/google-maps-v3-how-to-calculate-the-zoom-level-for-a-given-bounds

import math


def generate_map_url(
        min_map_width_px,
        max_map_width_px,
        min_map_height_px,
        max_map_height_px,
        marker_groups):
    """
    Return a Google Static Map URL for a map that contains markers within
    some dimensions which are smaller than the map image dimensions. This
    effectively allows for setting minimum X and Y margins around the markers
    in a map. This is useful for a "fluid" web design where a maximum map
    size is requested from Google and is then cut off at the edges for
    small browser windows.
    """
    # Determine the maximum zoom to contain markers at the minimum map size
    lat_list = [
        lat for markers in marker_groups for lat, lng in markers['lat_lng']]
    lng_list = [
        lng for markers in marker_groups for lat, lng in markers['lat_lng']]
    max_zoom = get_zoom_to_fit(
        min(lat_list), max(lat_list), min(lng_list), max(lng_list),
        min_map_width_px, min_map_height_px,
    )

    # Build the markers query string arguments
    markers_args = ''
    for markers in marker_groups:
        lat_lng = '|'.join([
            '{},{}'.format(lat, lng) for lat, lng in markers['lat_lng']])
        markers_args += '&markers;=color:{}|{}'.format(markers['color'], lat_lng)

    # Build and return the map URL
    return ''.join([
        'http://maps.googleapis.com/maps/api/staticmap',
        '?sensor=false&v;=3&visual;_refresh=true',
        '&size;={}x{}&zoom;={}'.format(
            max_map_width_px, max_map_height_px, max_zoom),
        markers_args,
    ])


def get_zoom_to_fit(min_lat, max_lat, min_lng, max_lng, width_px, height_px):
    """
    Return the maximum zoom that will fit the given min/max lat/lng
    coordinates in a map of the given dimensions. This is used to
    override the zoom set by Google's implicit positioning.

    Calculation translated from Javascript to Python from:
    http://stackoverflow.com/questions/6048975/google-maps-v3-how-to-calculate-the-zoom-level-for-a-given-bounds
    """
    GOOGLE_WORLD_WIDTH = 256
    GOOGLE_WORLD_HEIGHT = 256
    MAX_ZOOM = 17

    def lat2rad(lat):
        sinlat = math.sin(math.radians(lat))
        radx2 = math.log((1 + sinlat) / (1 - sinlat)) / 2.0
        return max(min(radx2, math.pi), -math.pi) / 2.0

    def zoom(map_px, world_px, fraction):
        # Use int() to round down to the nearest integer
        return int(
            math.log(float(map_px) / float(world_px) / fraction)
            / math.log(2.0)
        )

    # Determine the maximum zoom based on height and latitude
    if min_lat == max_lat:
        lat_zoom = MAX_ZOOM
    else:
        lat_fraction = (lat2rad(max_lat) - lat2rad(min_lat)) / math.pi
        lat_zoom = zoom(height_px, GOOGLE_WORLD_HEIGHT, lat_fraction)

    # Determine the maximum zoom based on width and longitude
    if min_lng == max_lng:
        lng_zoom = MAX_ZOOM
    else:
        lng_range = max_lng - min_lng
        if lng_range < 0:
            lng_range += 360.0
        lng_fraction = lng_range / 360.0
        lng_zoom = zoom(width_px, GOOGLE_WORLD_WIDTH, lng_fraction)

    return min(lat_zoom, lng_zoom, MAX_ZOOM)

Here is an example:

map_url = generate_map_url(
    min_map_width_px=240, max_map_width_px=380,
    min_map_height_px=285, max_map_height_px=325,
    marker_groups=[
        {'color': 'blue',
         'lat_lng': [(34.0993, -118.8394)]},
        {'color': 'orange',
         'lat_lng': [
             (34.3997, -119.2002),
             (34.5389, -118.4499),
             (34.0983, -118.1285),
             (33.5932, -117.9455),
             (33.8322, -117.3958),
         ]}
    ]
)
print map_url

Here is a map without the margin: http://maps.googleapis.com/maps/api/staticmap?sensor=false&v;=3&visual;_refresh=true&size;=380x325&markers;=color:blue|34.0993,-118.8394&markers;=color:orange|34.3997,-119.2002|34.5389,-118.4499|34.0983,-118.1285|33.5932,-117.9455|33.8322,-117.3958

Here is the result with the margin: http://maps.googleapis.com/maps/api/staticmap?sensor=false&v;=3&visual;_refresh=true&size;=380x325&zoom;=7&markers;=color:blue|34.0993,-118.8394&markers;=color:orange|34.3997,-119.2002|34.5389,-118.4499|34.0983,-118.1285|33.5932,-117.9455|33.8322,-117.3958

Do you have a lot of short, single-use, private functions in your Python code?

Do you have a lot of short, single-use, private functions in your Python code? For example, below is some stubbed out authentication code I've been working on. It checks if a user's password is correct and updates the hash algorithm to use bcrypt. The 4 private functions with the leading underscore are from 1 to 10 lines long and are only used by the check_password function. These functions are part of a larger module with about 20 functions. I don't like that these 4 functions add clutter to the module and are not grouped with the function that uses them, check_password.

def _get_password_hash_from_db(email_address):
    """Get the user's password hash from the database.
    """


def _determine_password_hash_algorithm(password_hash):
    """Determine the hash algorithm.
    """


def _hash_password_old(password):
    """This is the OLD password hash algorithm.
    """


def _hash_existing_password_bcrypt(password, db_password_hash):
    """This is the NEW algorithm used for hashing existing passwords.
    """


def check_password(email_address, password):
    """Check if a user's supplied password is correct.
    """
    db_password_hash = _get_password_hash_from_db(email_address)
    hash_alg = _determine_password_hash_algorithm(db_password_hash)
    if hash_alg == 'BCRYPT':
        input_password_hash = _hash_existing_password_bcrypt(password, db_password_hash)
    else:
        input_password_hash = _hash_password_old(password)
    password_correct = (input_password_hash == db_password_hash)
    if password_correct and hash_alg != 'BCRYPT':
        call_change_password(email_address, password)
    return password_correct


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Sometimes, in cases like this, I move the 4 private functions to be nested functions inside check_password. I like how the functions are grouped together and that the module is not littered with extraneous functions. However, the inner functions are not easily testable and I don't see many people doing this.

def check_password(email_address, password):
    """Check if a user's supplied password is correct.
    """

    def get_password_hash_from_db(email_address):
        """Get the user's password hash from the database.
        """

    def determine_password_hash_algorithm(password_hash):
        """Determine the hash algorithm.
        """

    def hash_password_old(password):
        """This is the OLD password hash algorithm.
        """

    def hash_existing_password_bcrypt(password, db_password_hash):
        """This is the NEW algorithm used for hashing existing passwords.
        """

    db_password_hash = get_password_hash_from_db(email_address)
    hash_alg = determine_password_hash_algorithm(db_password_hash)
    if hash_alg == 'BCRYPT':
        input_password_hash = hash_existing_password_bcrypt(password, db_password_hash)
    else:
        input_password_hash = hash_password_old(password)
    password_correct = (input_password_hash == db_password_hash)
    if password_correct and hash_alg != 'BCRYPT':
        call_change_password(email_address, password)
    return password_correct


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Another option is to create a PasswordChecker class instead. This seems the most powerful and now the private methods are testable. However, this adds more overhead and I hear Jack Diederich telling me to Stop Writing Classes!

class _PasswordChecker(object):
    """Check if a user's supplied password is correct.
    """

    @staticmethod
    def _get_password_hash_from_db(email_address):
        """Get the user's password hash from the database.
        """

    @staticmethod
    def _determine_password_hash_algorithm(password_hash):
        """Determine the hash algorithm.
        """

    @staticmethod
    def _hash_password_old(password):
        """This is the OLD password hash algorithm.
        """

    @staticmethod
    def _hash_existing_password_bcrypt(password, db_password_hash):
        """This is the NEW algorithm used for hashing existing passwords.
        """

    def __call__(self, email_address, password):
        db_password_hash = self._get_password_hash_from_db(email_address)
        hash_alg = self._determine_password_hash_algorithm(db_password_hash)
        if hash_alg == 'BCRYPT':
            input_password_hash = self._hash_existing_password_bcrypt(password, db_password_hash)
        else:
            input_password_hash = self._hash_password_old(password)
        password_correct = (input_password_hash == db_password_hash)
        if password_correct and hash_alg != 'BCRYPT':
            call_change_password(email_address, password)
        return password_correct


check_password = _PasswordChecker()


def call_change_password(email_address, new_password):
    """Change the user's password.
    """

Maybe the solution is to break up the module into smaller modules which act like the class above? However this might leave me with some unevenly sized modules. How do you handle this?

How to expose a Flask local development server to the public using SSH remote port forwarding

Here is how to run a Flask local development server on your local machine and expose it to the public via a remote server you have control over. This uses SSH remote port forwarding which is a converse of local port forwarding described here: How to run a Django local development server on a remote machine and access it in your browser on your local machine using SSH port forwarding

  1. On the remote host, edit the sshd_config file (mine was located at /etc/ssh/sshd_config) to allow remote hosts to connect to ports forwarded for the client:
    GatewayPorts yes
  2. On the remote host, restart the SSH server:
    $ sudo service sshd restart 
    
  3. On the local host, SSH to the remote host:
    $ ssh -v -R 50051:localhost:5000 eliot@my.remotehost.com 
    
  4. On the local host, run the Flask dev server:
    $ python runserver.py localhost 5000 
    
  5. Go to http://my.remotehost.com:50051 in the browser

Using RemoteForward in your ~/.ssh/config

You can also achieve the same results by using the RemoteForward in your ~/.ssh/config file:

Host myremote
  User eliot
  HostName my.remotehost.com
  RemoteForward 50051 localhost:5000

References

See also

localtunnel by Jeff Lindsay exposes your local development server without requiring a public remote server.

When is the try-finally block used in Python?

The finally block is used to define clean-up actions. Why is the finally block needed? Why can't the clean up actions be put after the try/except/else block? This works in some cases, but if there is a return, break, or continue, or an unhandled exception inside the try, except, or else clauses, that code will never be executed. The finally block executes even in these conditions.

try:
    print 'Inside try'
    raise Exception
finally:
    print 'Inside finally'
print 'Never get here'

Results:

Inside try
Inside finally
Traceback (most recent call last):
  File "tmp.py", line 13, in 
    raise Exception
Exception

Reference: http://docs.python.org/2/tutorial/errors.html#defining-clean-up-actions

Using Python's gzip and StringIO to compress data in memory

I needed to gzip some data in memory that would eventually end up saved to disk as a .gz file. I thought, That's easy, just use Python's built in gzip module.

However, I needed to pass the data to pycurl as a file-like object. I didn't want to write the data to disk and then read it again just to pass to pycurl. I thought, That's easy also-- just use Python's cStringIO module.

The solution did end up being simple, but figuring out the solution was a lot harder than I thought. Below is my roundabout process of finding the simple solution.

Here is my setup/test code. I am running Python 2.7.3 on Ubuntu 12.04.

import cStringIO
import gzip


STUFF_TO_GZIP = """Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?"""
FILENAME = 'myfile.json.gz'


def pycurl_simulator(fileobj):

    # Get the file size
    fileobj.seek(0, 2)
    filesize = fileobj.tell()
    fileobj.seek(0, 0)

    # Read the file data
    fout = open(FILENAME, 'wb')
    fout.write(fileobj.read())
    fout.close()

    return filesize

Try 1: seek from the end fails

Here is my first attempt using cStringIO with the gzip module.

def try1_seek_from_end_fails():

    ftemp = cStringIO.StringIO()
    fgzipped = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=ftemp)
    fgzipped.write(STUFF_TO_GZIP)
    filesize = pycurl_simulator(fgzipped)
    print filesize

I got this exception:

        Traceback (most recent call last):
          File "tmp.py", line 232, in <module>
            try1_seek_from_end_fails()
          File "tmp.py", line 83, in try1_seek_from_end_fails
            filesize = pycurl_simulator(fgzipped)
          File "tmp.py", line 25, in pycurl_simulator
            fileobj.seek(0, 2)
          File "/usr/lib/python2.7/gzip.py", line 415, in seek
            raise ValueError('Seek from end not supported')
        ValueError: Seek from end not supported

It turns out the gzip object doesn't support seeking from the end. See this thread on the Python mailing list: http://mail.python.org/pipermail/python-list/2009-January/519398.html

Try 2: data is not compressed

What if we don't seek() from the end and just tell() where we are? (It should be at the end after doing a write(), right?) Unfortunately, this gave me the uncompressed size.

Reading from the GzipFile object also gave me an error saying that I couldn't read from a writable object.

def try2_data_is_not_compressed():

    ftemp = cStringIO.StringIO()
    fgzipped = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=ftemp)
    fgzipped.write(STUFF_TO_GZIP)
    filesize = fgzipped.tell()
    print filesize

Try 5: file much too small

I googled, then looked at the source code for gzip.py. I found that the compressed data was in the StringIO object. So I performed my file operations on it instead of the GzipFile object. Now I was able to write the data out to a file. However, the size of the file was much too small.

def try5_file_much_too_small():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    filesize = pycurl_simulator(fgz)
    print filesize

Try 6: unexpected end of file

I saw there was a flush() method in the source code. I added a call to flush(). This time, I got a reasonable file size, however, when trying to gunzip it from the command line, I got the following error:

        gzip: myfile.json.gz: unexpected end of file
def try6_unexpected_end_of_file():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.flush()
    filesize = pycurl_simulator(fgz)
    print filesize

Try 7: got it working

I knew that GzipFile worked properly when writing files directly as opposed to reading from the StringIO object. It turns out the difference was that there was code in the close() method of GzipFile which wrote some extra required data. Now stuff was working.

def try7_got_it_working():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.flush()

    # Do stuff that GzipFile.close() does
    gzip_obj.fileobj.write(gzip_obj.compress.flush())
    gzip.write32u(gzip_obj.fileobj, gzip_obj.crc)
    gzip.write32u(gzip_obj.fileobj, gzip_obj.size & 0xffffffffL)

    filesize = pycurl_simulator(fgz)
    print filesize

Try 8: (not really) final version

Here's the (not really) final version using a subclass of GzipFile that adds a method to write the extra data at the end. If also overrides close() so that stuff isn't written twice in case you need to use close(). Also, the separate flush() call is not needed.

def try8_not_really_final_version():

    class MemoryGzipFile(gzip.GzipFile):
        """
        A GzipFile subclass designed to be used with in memory file like
        objects, i.e. StringIO objects.
        """

        def write_crc_and_filesize(self):
            """
            Flush and write the CRC and filesize. Normally this is done
            in the close() method. However, for in memory file objects,
            doing this in close() is too late.
            """
            self.fileobj.write(self.compress.flush())
            gzip.write32u(self.fileobj, self.crc)
            # self.size may exceed 2GB, or even 4GB
            gzip.write32u(self.fileobj, self.size & 0xffffffffL)

        def close(self):
            if self.fileobj is None:
                return
            self.fileobj = None
            if self.myfileobj:
                self.myfileobj.close()
                self.myfileobj = None

    fgz = cStringIO.StringIO()
    gzip_obj = MemoryGzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.write_crc_and_filesize()

    filesize = pycurl_simulator(fgz)
    print filesize

Try 9: didn't need to do that (final version)

It turns out I can close the GzipFile object and the StringIO object remains available. So that MemoryGzipFile class above is completely unnecessary. I am dumb. Here is the final iteration:

def try9_didnt_need_to_do_that():

    fgz = cStringIO.StringIO()
    gzip_obj = gzip.GzipFile(
        filename=FILENAME, mode='wb', fileobj=fgz)
    gzip_obj.write(STUFF_TO_GZIP)
    gzip_obj.close()

    filesize = pycurl_simulator(fgz)
    print filesize

References

Here is some googling I did:

Created with Django and Bootstrap | Hosted by Linode