SaltyCrane Blog — Notes on JavaScript and web development

Setting up a Linux DVR w/ MythTV, Ubuntu 12.04, and a Hauppauge WinTV-HVR 1250 TV tuner card

Setting up MythTV involves a little pain, but once it's set up, it's pretty great. And you don't have to spend lots of money on a DVR from the cable company. With my modest hardware specs, playback is smooth and clear, however Picture in Picture is too jittery to be useful. Here's what I did to get my MythTV DVR running on my Ubuntu machine.

Parameters

Install the Hauppauge WinTV-HVR 1250 TV tuner card

Put the card in the computer. Connect the TV antenna to the card.

Check the TV tuner card is recognized

Ubuntu 12.04 includes drivers for the Hauppauge 1250 TV tuner card, so I did not need to install any drivers.

Update 2016-10-16:On Ubuntu 16.04, look in /var/log/syslog instead of /var/log/dmesg.

$ cat /var/log/dmesg
[   15.211985] cx23885 driver version 0.0.3 loaded
[   15.214279] cx23885 0000:03:00.0: PCI INT A -> GSI 17 (level, low) -> IRQ 17
[   15.214492] CORE cx23885[0]: subsystem: 0070:2259, board: Hauppauge WinTV-HVR1255 [card=20,autodetected]
[   15.214600] IR NEC protocol handler initialized
[   15.230936] IR RC5(x) protocol handler initialized
[   15.235576] MCE: In-kernel MCE decoding enabled.
[   15.237132] IR RC6 protocol handler initialized
[   15.237703] EDAC MC: Ver: 2.1.0
[   15.238256] AMD64 EDAC driver v3.4.0
[   15.242493] IR JVC protocol handler initialized
[   15.246743] IR Sony protocol handler initialized
[   15.250908] IR MCE Keyboard/mouse protocol handler initialized
[   15.256862] lirc_dev: IR Remote Control driver registered, major 250 
[   15.257125] IR LIRC bridge handler initialized
[   15.284735] lp0: using parport0 (interrupt-driven).
[   15.361892] tveeprom 0-0050: Hauppauge model 22111, rev E2F5, serial# 8323201
[   15.361895] tveeprom 0-0050: MAC address is 00:0d:fe:7f:00:81
[   15.361897] tveeprom 0-0050: tuner model is NXP 18271C2 (idx 155, type 54)
[   15.361899] tveeprom 0-0050: TV standards NTSC(M) ATSC/DVB Digital (eeprom 0x88)
[   15.361901] tveeprom 0-0050: audio processor is CX23888 (idx 40)
[   15.361903] tveeprom 0-0050: decoder processor is CX23888 (idx 34)
[   15.361904] tveeprom 0-0050: has no radio, has IR receiver, has no IR transmitter
[   15.361906] cx23885[0]: hauppauge eeprom: model=22111
[   15.361909] cx23885_dvb_register() allocating 1 frontend(s)

Install MythTV

$ sudo apt-get install mythtv

Set up the MythTV backend

Run mythtv-setup to select your TV tuner card and scan for channels.

$ mythtv-setup

Click "Yes" to add your user to the "mythtv" group.

Click "Yes" to restart your login session.

Change the following options:

  • 2. Capture cards -> (New capture card) -> Card type: DVB DTV capture card (v3.x) -> Finish
  • 4. Video sources -> (New video source) -> Video source name: FOOBAR, Listings grabber: North America (SchedulesDirect.org) (Internal), User ID: blank, Pass: blank
  • 5. Input connections -> [DVB: /dev/dvb/adapter0/frontend0] -> Video source: FOOBAR -> Scan for channels
    Note: if you see the error "Failed to open the card", make sure the mythv-backend is actually stopped. The GUI said it would stop the backend but it was still running.

After running mythtv-setup, it will ask you if you want to start the backend. Select yes to start the backend. It will also ask you if you want to run mythfilldatabase. Select yes to run mythfilldatabase. This may take a while.

Ensure mythv backend is running

After running mythtv-setup, the mythtv backend should start running.

To check that the backend is running, run:

$ ps -ef | grep myth

If the mythtv backend is not running, start it using the following command:

$ sudo service mythtv-backend start

Troubleshooting mythbackend

If mythbackend doesn't stay running, there may be some configuration that is broken. Check /var/log/syslog. If that does not have enough information, run the backend with the --verbose option:

$ mythbackend --verbose

Run the MythTV frontend

$ mythfrontend

Some keyboard shortcuts

  • P - pause/play
  • SPACE - set/clear bookmark
  • LEFT/RIGHT ARROW - skip back/forward
  • M - menu
  • D - delete

Other stuff

  • You may want to change the theme. I chose the TintedGlass 2.43 theme.
  • To get schedule information, I ended up signing up for a membership at www.schedulesdirect.org. It is $25/year (or ~$2/month). It seems to be the recommended way to get schedule information.

How run mythfrontend on another Ubuntu laptop connected to your LAN (Added 2013-06-07)


Since MythTV has a flexible client/server architecture, you can run the MythTV backend server on one machine and access it from multiple other machines running a Mythtv frontend. These steps assume the remote frontend is running on a laptop with Ubuntu 12.04 and it is connected to your local network (LAN) (not through the internet (though that is possible.).)

UPDATE: Playing 1080p HD content over my $30 Belkin G wireless router (rated at 54 Mbps) had occasional stalls in the playback. Repositioning my router helped, but after a couple days, I decided to order a Netgear N600 Wireless-N Dual Band Router. Hopefully this will solve my problem.

On the Mythtv backend server configured above:

  • Determine the IP address of the Mythtv backend server by running ifconfig
    $ ifconfig 
    For me, it is 192.168.2.2. This will be used in the steps below.
  • Follow the instructions here: http://www.mythtv.org/wiki/Mythfrontend
    • Edit /etc/mysql/my.cnf so the the bind-address line is commented out (on Ubuntu 16.04 edit /etc/mysql/mysql.conf.d/mysqld.cnf):
      #bind-address 127.0.0.1
    • Allow remote users access to the database. Note: replace "mypassword" with the value found in ~/.mythtv/mysql.txt.
      $ mysql -u root
      mysql> grant all on mythconverg.* to 'mythtv'@'%' identified by 'mypassword';
      mysql> flush privileges;
      mysql> exit
    • Restart mysql server:
      $ sudo service mysql restart 
  • Ensure mythbackend is not using 127.0.0.1.
    • Run mythtv-setup:
      $ mythtv-setup 
    • Change the IP address from 127.0.0.1 to 192.168.2.2 (or IP address you determined from above.)

On the laptop:

  • $ sudo apt-get install mythtv-frontend 
  • click "yes" to be added to the mythtv group
  • click "yes" to restart your session
  • click "OK" to the msg about logging out of your session
  • logout and login again
  • Run mythfrontend
    $ mythfrontend 
    
  • For the hostname: enter the IP address of the Mythtv server. For me it is 192.168.2.2.
  • Enter the Mysql password. This can be found in ~/.mythtv/mysql.txt (or ~/.mythtv/config.xml or /etc/mythtv/config.xml on newer versions) on the Mythtv server machine. Or you can check the settings of the mythfrontend running on the server machine.

How to watch your recorded videos on your Android phone over the internet


  • This method uses the MythTV Services API
  • PC: Set up a SSH server on your MythTV backend server
  • PC: Get the external IP address of your MythTV backend server
    $ curl http://ifconfig.me
    111.222.333.444 
  • Android: Install Connectbot on your Android phone and enable port forwarding of 6544. For more info see: http://parker1.co.uk/mythtv_ssh.php
    • Android: Using Connectbot, connect to your MythTV server using the IP address from above (111.222.333.444)
    • Android: Menu -> Port Forwards -> Menu -> Add port forward:
      • Nickname: mythtv
      • Type: local
      • Source port: 6544
      • Destination: localhost:6544
    • Android: Disconnect and reconnect
  • Android: Install and set up MythTV Android Frontend
    • Android: Touch the settings icon -> Away Profiles
      • Name: Away
      • MythTV Master Backend Address: http://localhost:6544/
      Save
    • Android: Away -> Recordings -> Select a show to watch -> watch it

See also:

How to control your DVR from your Android phone

  • Configure your MythTv Frontend on your PC:
    • Setup -> General -> Hit "Next" 6 times -> Check "Enable Network Remote Control"
    • Setup -> Appearance -> Hit "Next" 3 times and
      • Check "Enable LCD device"
      • Check "Display time"
      • Check "Display menus"
      • Check "Display music arstist and title"
      • Check "Display channel information"
  • Install MythDroid on your Android phone
  • Install MDD on your PC
    • Install libimlib2
      $ sudo apt-get install libimlib2-dev 
      
    • Download MDD
      $ wget http://mythdroid.googlecode.com/files/mdd-0.6.2.tgz 
      
    • Install MDD
      $ tar xvf mdd-0.6.2.tgz 
      $ cd mdd 
      $ perl Build.PL 
      
      Type "y" because you are running this on the PC that runs your MythTv frontend
      $ ./Build installdeps 
      
      Hit ENTER to accept all the defaults
      $ ./Build test 
      $ sudo ./Build install 
      
      Type "y" to stop mythfrontend. Then start it again
      $ mythfrontend 
      

How to restore an old database on a fresh install Added 2016-10-16

See https://www.mythtv.org/wiki/Database_Backup_and_Restore

$ wget https://raw.githubusercontent.com/MythTV/mythtv/master/mythtv/programs/scripts/database/mythconverg_restore.pl 
$ chmod a+x mythconverg_restore.pl 
$  ./mythconverg_restore.pl --directory /mnt/hdd2/mythtv/db_backups --filename mythconverg-1317-20161008080149.sql.gz --drop_database --create_database 
$ # if there has been a database schema change, you will need to run mythtv-setup

Help / References

How to prevent nose (unittest) from using the docstring when verbosity >= 2

Some of our Python unit tests have docstrings. I find it annoying that, when using a verbosity level >= 2, nose prints the docstring instead of the class name and method name. Here's a hack to prevent it from doing that: Add a shortDescription() method to the test case class that returns None.

Here is an example of normal behavior:

import unittest

class MyTestCase(unittest.TestCase):
    def test_with_docstring(self):
        """Test that something does something
        """

    def test_without_docstring(self):
        pass
$ nosetests --verbosity=2 tmp.py
Test that something does something ... ok
test_without_docstring (tmp.MyTestCase) ... ok

Here is an example with the hack to prevent printing the docstring:

import unittest

class MyTestCase(unittest.TestCase):
    def shortDescription(self):
        return None

    def test_with_docstring(self):
        """Test that something does something
        """

    def test_without_docstring(self):
        pass
$ nosetests --verbosity=2 tmp.py
test_with_docstring (tmp.MyTestCase) ... ok
test_without_docstring (tmp.MyTestCase) ... ok

Hack to share & sync Google contacts between Android phones

I want to share and sync (in real time) Google (Gmail) contacts with my wife on our Android 2.3.6 Gingerbread phones. Google does not make this easy to do. Here's the best solution I could come up with (ref whitenack on androidcentral). (Note: these are not our real email addresses.)

  • This contact list resides only on the [email protected] account.
  • Contacts are removed from the "My Contacts" group and instead stored in groups called "Angela" and/or "Eliot". For shared contacts, the contact is in both groups. (Contact groups are like tags. A contact can be in multiple groups at the same time.)
  • Contacts in the "Angela" group show up on Angela's phone and contacts in the "Eliot" group show up on Eliot's phone. Contacts in both groups show up in both phones.
  • On Angela's phone, add the [email protected] account and check the box for syncing Contacts and uncheck the box for syncing Contacts from the [email protected] account.
  • On Angela's phone, check the box for displaying the groups "My Contacts" and "Angela" under the [email protected] account and uncheck all the boxes for displaying contacts on the [email protected] account.
  • On Eliot's phone, check the box for displaying the groups "My Contacts" and "Eliot" under the [email protected] account
  • On both phones, set the account used for creating new contacts to [email protected]: Contacts -> More -> Settings -> Contact storage -> Select the [email protected] account
  • When a *new* contact is added on either of the phones, it will be added to the "My Contacts" group on the [email protected] account. These contacts later need to be moved to the "Angela" and/or "Eliot" groups from the browser while signed in to the [email protected] account.
  • The [email protected] account will not be able to view, add, or edit contacts from the browser (Gmail).

We are able to share and sync contacts in real time, however there are annoyances. The main problem is that contact list lives under one account, so it is not available to the secondary user (my wife) when she is using Gmail or wants to manage contacts in the browser. A second minor annoyance is that our Android phones don't allow us to assign a contact to a group, so all new contacts added from our phones will be added to the generic "My Contacts" group and need to be categorized later from the browser.

I also tried the free Google Apps because it has Contact sharing. However, I could not figure out how to get shared contacts to show up in our phones.

Will upgrading to Android 4.0 ICS help?

Test coverage with nose and coverage.py

It's fun to use nose + coverage.py to show my progress as I write tests. Seeing the bar next to my code change from red to green makes me happy. 100% test coverage does not mean tests are complete. For example, a boolean OR'ed conditional expression may not test all conditions even though the line is marked as covered. Other limitations are discussed here: Flaws in coverage measurement. However, good test coverage is at least a step towards having a good test suite.

Install nose and coverage.py

Activate your virtualenv and pip install nose and coverage.

$ pip install nose 
$ pip install coverage 

Run it

Here is the command line I use to run the tests. --with-coverage enables the nose-coverage plugin to check test coverage. --cover-erase erases coverage test results from a previous run. --cover-package specifies which Python package to analyze. Specifiy the package as you would using an import (e.g. dp.blueprints.info.views). If --cover-package is not specified, it will analyze everything. --cover-html enables pretty HTML coverage reports. This example is for the flask-encryptedsession tests.

$ nosetests --with-coverage --cover-erase --cover-package=flask_encryptedsession --cover-html
..........
Name                                      Stmts   Miss  Cover   Missing
-----------------------------------------------------------------------
flask_encryptedsession                        0      0   100%   
flask_encryptedsession.encryptedcookie       41      1    98%   176
flask_encryptedsession.encryptedsession      35      1    97%   75
-----------------------------------------------------------------------
TOTAL                                        76      2    97%   
----------------------------------------------------------------------
Ran 10 tests in 0.188s

OK

Display the HTML report

$ firefox cover/index.html 

Get branch coverage

Branch coverage is useful for checking "if" statements without an explicit "else" in the code. I had to install the development version of nose to use this feature: As of version 1.2.0, this feature is available.

$ pip install https://github.com/nose-devs/nose/tarball/master 
$ nosetests --cover-branches --with-coverage --cover-erase --cover-package=flask_encryptedsession --cover-html 
..........
Name                                      Stmts   Miss Branch BrPart  Cover   Missing
-------------------------------------------------------------------------------------
flask_encryptedsession                        0      0      0      0   100%   
flask_encryptedsession.encryptedcookie       41      1     12      1    96%   176
flask_encryptedsession.encryptedsession      35      1      4      1    95%   75
-------------------------------------------------------------------------------------
TOTAL                                        76      2     16      2    96%   
----------------------------------------------------------------------
Ran 10 tests in 0.234s

OK

Pycon 2012 talks that I saw that I enjoyed

The Pycon 2012 videos are up at pyvideo.org. Here are some of the talks I enjoyed that I saw. I know I probably missed some great talks so I will try to watch more online. Let me know if there are some that I should not miss.

Favorite talk of the conference

Other great talks (in chronological order)

A unique Python redis-based queue with delay

This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.

Note: I wrote this in May 2010. I ended up using this solution after trying out beanstalkd and Gearman.

Install

Install on Ubuntu 10.10 Maverick

  • Install the redis server
    $ sudo apt-get install redis-server 
  • Install the python redis client
    $ pip install redis 
  • Default conf file: /etc/redis/redis.conf
    Default log file: /var/log/redis/redis-server.log
    Default db dir: /var/lib/redis
    Stop redis server: sudo /etc/init.d/redis-server stop
    Start redis server: sudo /etc/init.d/redis-server start

Redis commands used

The queue is based on the redis sorted set data type and uses the following commands:

  • ZADD - Add members to a sorted set, or update its score if it already exists
  • ZRANGEBYSCORE - Return a range of members in a sorted set, by score
  • ZREM - Remove one or more members from a sorted set

Code

import time
import redis


REDIS_ADDRESS = '127.0.0.1'


class UniqueMessageQueueWithDelay(object):
    """A message queue based on the Redis sorted set data type. Duplicate items
    in the queue are not allowed. When a duplicate item is added to the queue,
    the new item is added, and the old duplicate item is removed. A delay may be
    specified when adding items to the queue. Items will only be popped after
    the delay has passed. Pop() is non-blocking, so polling must be used. The
    name of the queue == the Redis key for the sorted set.
    """
    def __init__(self, name):
        self.name = name
        self.redis = redis.Redis(REDIS_ADDRESS)

    def add(self, data, delay=0):
        """Add an item to the queue. delay is in seconds.
        """
        score = time.time() + delay
        self.redis.zadd(self.name, data, score)
        debug('Added %.1f, %s' % (score, data))

    def pop(self):
        """Pop one item from the front of the queue. Items are popped only if
        the delay specified in the add() has passed. Return False if no items
        are available.
        """
        min_score = 0
        max_score = time.time()
        result = self.redis.zrangebyscore(
            self.name, min_score, max_score, start=0, num=1, withscores=False)
        if result == None:
            return False
        if len(result) == 1:
            debug('Popped %s' % result[0])
            return result[0]
        else:
            return False

    def remove(self, data):
        return self.redis.zrem(self.name, data)


def debug(msg):
    print msg


def test_queue():
    u = UniqueMessageQueueWithDelay('myqueue')

    # add items to the queue
    for i in [0, 1, 2, 3, 4, 0, 1]:
        data = 'Item %d' % i
        delay = 5
        u.add(data, delay)
        time.sleep(0.1)

    # get items from the queue
    while True:
        print
        result = u.pop()
        print result
        if result != False:
            u.remove(result)
        time.sleep(1)


if __name__ == '__main__':
    test_queue()

Results:

Added 1320773851.8, Item 0
Added 1320773851.9, Item 1
Added 1320773852.0, Item 2
Added 1320773852.1, Item 3
Added 1320773852.2, Item 4
Added 1320773852.3, Item 0
Added 1320773852.4, Item 1

False

False

False

False

False

Popped Item 2
Item 2

Popped Item 3
Item 3

Popped Item 4
Item 4

Popped Item 0
Item 0

Popped Item 1
Item 1

False

False

False
^CTraceback (most recent call last):
  File "umqwdredisqueue.py", line 102, in 
    test_queue()
  File "umqwdredisqueue.py", line 98, in test_queue
    time.sleep(1)
KeyboardInterrupt

See also

Python gnupg (GPG) example

python-gnupg is a Python package for encrypting and decrypting strings or files using GNU Privacy Guard (GnuPG or GPG). GPG is an open source alternative to Pretty Good Privacy (PGP). A popular use of GPG and PGP is encrypting email. For more information, see the python-gnupg documentation. Another option for encrypting data from Python is keyczar.

Install

This installs the Ubuntu GPG package, creates a test user, and installs the Python package, python-gnupg. This was installed on Ubuntu 10.10 Maverick Meerkat.

$ sudo apt-get install gnupg 
$ sudo adduser testgpguser 
$ sudo su testgpguser 
$ cd 
$ virtualenv --no-site-packages venv 
$ source venv/bin/activate 
$ pip install python-gnupg 

Generate a key

This creates a GPG key. This also creates the gpghome directory if it does not exist. You may need to supply random hardware activity during the key generation. See the docs for more information. To generate random numbers, you can also install the rng-tools package.

$ sudo apt-get install rng-tools 
import os
import gnupg

os.system('rm -rf /home/testgpguser/gpghome')
gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
input_data = gpg.gen_key_input(
    name_email='[email protected]',
    passphrase='my passphrase')
key = gpg.gen_key(input_data)
print key
B0F4CF530036CE8CD1C064F17D32CEE72C015CD5

Export keys

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
ascii_armored_public_keys = gpg.export_keys(key)
ascii_armored_private_keys = gpg.export_keys(key, True)
with open('mykeyfile.asc', 'w') as f:
    f.write(ascii_armored_public_keys)
    f.write(ascii_armored_private_keys)
(venv)testgpguser@mymachine:~$ cat mykeyfile.asc 
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)

mI0ETqrVGAEEAP42Xs1vQv40MxA3/g/Le5B0VatnDYaSvAhiYfaub79HY4mjYcCD
FPDo5b54PSzyhlVsz5RL46+RE9NpQ2JdvFofWi7eVzfdmmTtNYEaiUSmzLUq73Vz
qu7P1RhOfwuAyW0otnw/Lw54MVjVZblvp3ln1Fcpleb9ZSrY1h61Y8pHABEBAAG0
REF1dG9nZW5lcmF0ZWQgS2V5IChHZW5lcmF0ZWQgYnkgZ251cGcucHkpIDx0ZXN0
Z3BndXNlckBteWRvbWFpbi5jb20+iLgEEwECACIFAk6q1RgCGy8GCwkIBwMCBhUI
AgkKCwQWAgMBAh4BAheAAAoJEH0yzucsAVzVBjwD/1KgTx1y3cpuumu1HF0GtQV0
Wn7l9OaSj98CqQ/f2emHD1l9rrjdt9jm1g7wSsWumpKs57vxz7NXwHw7mI4qZ5m0
cvg/qRc/BBMP8v2WgzRsmls97Pplaate1k3QfvDCVs6F1qiIQyELffjxBHbmWPhx
XEwhnpLcvk2l7NbNnEwA
=exDD
-----END PGP PUBLIC KEY BLOCK-----
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1.4.10 (GNU/Linux)

lQH+BE6q1RgBBAD+Nl7Nb0L+NDMQN/4Py3uQdFWrZw2GkrwIYmH2rm+/R2OJo2HA
gxTw6OW+eD0s8oZVbM+US+OvkRPTaUNiXbxaH1ou3lc33Zpk7TWBGolEpsy1Ku91
c6ruz9UYTn8LgMltKLZ8Py8OeDFY1WW5b6d5Z9RXKZXm/WUq2NYetWPKRwARAQAB
/gMDAq5W6uxeU2hDYDPZ1Yy+e97ppNXmdAeq1urZHmiPr4+a36nOWd6j0R/HBjG3
ELD8CqYiQ0vx8+F9rY/uwKga2bEkJsQXjvaaZtu97lzPyp2+avsaw2G+3jRAJWNL
5YG4c/XwK1cfEajM23f7zz/t6TRWG+Ve2Dzi7+obA0LuF8czSlpiTTEzLDk8QJCK
y2WmrZ+s+POWv3itVpI26o7PvTQESzwyKXdyCW2W66VnXTm4mQEL6kgyV0oO6xIl
QUVSn2XWvwFMg2iL+02zA467rsr1x6Nl8hEQJgFwJCejD2z+4C4yzEeQGFP9WUps
pbMedAjDHebhC9FzbW7yuQ3H7iTCK1mvidAFw2wTdrkH61ApzmSo/rSTSxXw7hLT
M/ONgYZtvr+CpJj+mIu1XvVDiftvMhXlwcvM8c9PB3zv+086K7kJDTnzPgYvL0H/
+V2b9X9BBfAax40MQuxZJWseaLtsxXyl/rhn8jSCFZoqtERBdXRvZ2VuZXJhdGVk
IEtleSAoR2VuZXJhdGVkIGJ5IGdudXBnLnB5KSA8dGVzdGdwZ3VzZXJAbXlkb21h
aW4uY29tPoi4BBMBAgAiBQJOqtUYAhsvBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIX
gAAKCRB9Ms7nLAFc1QY8A/9SoE8dct3KbrprtRxdBrUFdFp+5fTmko/fAqkP39np
hw9Zfa643bfY5tYO8ErFrpqSrOe78c+zV8B8O5iOKmeZtHL4P6kXPwQTD/L9loM0
bJpbPez6ZWmrXtZN0H7wwlbOhdaoiEMhC3348QR25lj4cVxMIZ6S3L5NpezWzZxM
AA==
=v9Z7
-----END PGP PRIVATE KEY BLOCK-----

Import keys

import gnupg
from pprint import pprint

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
key_data = open('mykeyfile.asc').read()
import_result = gpg.import_keys(key_data)
pprint(import_result.results)
[{'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'ok': u'0',
  'text': 'Not actually changed\n'},
 {'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'ok': u'16',
  'text': 'Contains private key\nNot actually changed\n'}]

List keys

import gnupg
from pprint import pprint

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
public_keys = gpg.list_keys()
private_keys = gpg.list_keys(True)
print 'public keys:'
pprint(public_keys)
print 'private keys:'
pprint(private_keys)
public keys:
[{'algo': u'1',
  'date': u'1319818520',
  'dummy': u'',
  'expires': u'',
  'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'keyid': u'7D32CEE72C015CD5',
  'length': u'1024',
  'ownertrust': u'u',
  'trust': u'u',
  'type': u'pub',
  'uids': [u'Autogenerated Key (Generated by gnupg.py) ']}]
private keys:
[{'algo': u'1',
  'date': u'1319818520',
  'dummy': u'',
  'expires': u'',
  'fingerprint': u'B0F4CF530036CE8CD1C064F17D32CEE72C015CD5',
  'keyid': u'7D32CEE72C015CD5',
  'length': u'1024',
  'ownertrust': u'',
  'trust': u'',
  'type': u'sec',
  'uids': [u'Autogenerated Key (Generated by gnupg.py) ']}]

Encrypt a string

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
unencrypted_string = 'Who are you? How did you get in my house?'
encrypted_data = gpg.encrypt(unencrypted_string, '[email protected]')
encrypted_string = str(encrypted_data)
print 'ok: ', encrypted_data.ok
print 'status: ', encrypted_data.status
print 'stderr: ', encrypted_data.stderr
print 'unencrypted_string: ', unencrypted_string
print 'encrypted_string: ', encrypted_string
ok:  True
status:  encryption ok
stderr:  [GNUPG:] BEGIN_ENCRYPTION 2 9
[GNUPG:] END_ENCRYPTION

unencrypted_string:  Who are you? How did you get in my house?
encrypted_string:  -----BEGIN PGP MESSAGE-----
Version: GnuPG v1.4.10 (GNU/Linux)

hIwDFuhrAS77HYIBBACXqZ66rkGQv8yE61JddEmad3fUNvbfkhBPUI9OSaMO3PbN
Q/6SIDyi3FmhbM9icOBS7q3xddQpvFhwmrq9e3VLKnV3NSmWo+xJWosQ/GNAA/Hb
cwF1pOtR6bRHFBkqtmpTYnBo9rMpokW8lp4WxFxMda+af8TlId8HC0WcRUg4kNJi
AdV1fsd+sD/cGIp0cAltpaVuO4/uwV9lKd39VER6WigLDaeFUHjWhJbcHwTaJYHj
qmy5LRciNSjwsqeMK4zOFZyRPUqPVKwWLiE9kImMni0Nj/K54ElWujgTttZIlBqV
5+c=
=SM4r
-----END PGP MESSAGE-----

Decrypt a string

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
unencrypted_string = 'Who are you? How did you get in my house?'
encrypted_data = gpg.encrypt(unencrypted_string, '[email protected]')
encrypted_string = str(encrypted_data)
decrypted_data = gpg.decrypt(encrypted_string, passphrase='my passphrase')

print 'ok: ', decrypted_data.ok
print 'status: ', decrypted_data.status
print 'stderr: ', decrypted_data.stderr
print 'decrypted string: ', decrypted_data.data
ok:  True
status:  decryption ok
stderr:  [GNUPG:] ENC_TO 16E86B012EFB1D82 1 0
[GNUPG:] USERID_HINT 16E86B012EFB1D82 Autogenerated Key (Generated by gnupg.py) 
[GNUPG:] NEED_PASSPHRASE 16E86B012EFB1D82 16E86B012EFB1D82 1 0
[GNUPG:] GOOD_PASSPHRASE
gpg: encrypted with 1024-bit RSA key, ID 2EFB1D82, created 2011-11-02
      "Autogenerated Key (Generated by gnupg.py) "
[GNUPG:] BEGIN_DECRYPTION
[GNUPG:] PLAINTEXT 62 1320545729 
[GNUPG:] PLAINTEXT_LENGTH 41
[GNUPG:] DECRYPTION_OKAY
[GNUPG:] GOODMDC
[GNUPG:] END_DECRYPTION

decrypted string:  Who are you? How did you get in my house?

Encrypt a file

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
open('my-unencrypted.txt', 'w').write('You need to Google Venn diagram.')
with open('my-unencrypted.txt', 'rb') as f:
    status = gpg.encrypt_file(
        f, recipients=['[email protected]'],
        output='my-encrypted.txt.gpg')

print 'ok: ', status.ok
print 'status: ', status.status
print 'stderr: ', status.stderr
ok:  True
status:  encryption ok
stderr:  [GNUPG:] BEGIN_ENCRYPTION 2 9
[GNUPG:] END_ENCRYPTION
(venv)testgpguser@mymachine:~$  cat my-encrypted.txt.gpg 
-----BEGIN PGP MESSAGE-----
Version: GnuPG v1.4.10 (GNU/Linux)

hIwDfTLO5ywBXNUBBADo7trFZUD6Ir1vPRAJsoQXDiiw32N1m9/PXWCnQqX0nyzW
LfluNMfLFQRclNPVEg+o91qhS71apKvagp8DW7SCDE2SdCYk8nAS3bwAg5+GUyDs
XY2E6BQ1cLA1eK1V6D15ih6cq0laRzWuFkehH9PQ5Yp4ZZOmCbopw7dufnYPjdJb
AVGLpZRq64SuN1BUWIHbO7vqQGFq7qhGQwuegblEMm4vyr6FBW6JA/x4G/PMfImZ
1cH6KBrWGWrLCTiU/FKG9JvOm8mg8NXzd/TVjPs6rHRaKPFln37T7cLUwA==
=FSQP
-----END PGP MESSAGE-----

Decrypt a file

import gnupg

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
with open('my-encrypted.txt.gpg', 'rb') as f:
    status = gpg.decrypt_file(f, passphrase='my passphrase', output='my-decrypted.txt')

print 'ok: ', status.ok
print 'status: ', status.status
print 'stderr: ', status.stderr
ok:  True
status:  decryption ok
stderr:  [GNUPG:] ENC_TO 16E86B012EFB1D82 1 0
[GNUPG:] USERID_HINT 16E86B012EFB1D82 Autogenerated Key (Generated by gnupg.py) 
[GNUPG:] NEED_PASSPHRASE 16E86B012EFB1D82 16E86B012EFB1D82 1 0
[GNUPG:] GOOD_PASSPHRASE
gpg: encrypted with 1024-bit RSA key, ID 2EFB1D82, created 2011-11-02
      "Autogenerated Key (Generated by gnupg.py) "
[GNUPG:] BEGIN_DECRYPTION
[GNUPG:] PLAINTEXT 62 1320546031 
[GNUPG:] PLAINTEXT_LENGTH 32
[GNUPG:] DECRYPTION_OKAY
[GNUPG:] GOODMDC
[GNUPG:] END_DECRYPTION
(venv)testgpguser@mymachine:~$ cat my-decrypted.txt 
You need to Google Venn diagram.

Using curl over ftp took 3+ minutes for a 4 byte file w/ EPSV

It took over 3 minutes to download a 4 byte file with curl via ftp. It took less than a second with wget.

$ time curl -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt 
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4    0     4    0     0      0      0 --:--:--  0:03:09 --:--:--     0

real    3m9.411s
user    0m0.000s
sys     0m0.050s
Running curl with the verbose option

Running curl with the verbose option showed it was failing in "Extended Passive Mode".

$ curl -v -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt
* About to connect() to ftp.myserver.com port 21 (#0)
*   Trying 10.1.2.102...   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0connected
* Connected to ftp.myserver.com (10.1.2.102) port 21 (#0)
< 220 Welcome! 01-srv.
> USER myusername
< 331 Please specify the password.
> PASS mypassword
< 230 Login successful.
> PWD
< 257 "/"
* Entry path is '/'
> CWD dmi
< 250 Directory successfully changed.
> EPSV
* Connect data stream passively
< 229 Entering Extended Passive Mode (|||58226|)
  0     0    0     0    0     0      0      0 --:--:--  0:03:09 --:--:--     0Connection timed out
* couldn't connect to host
* got positive EPSV response, but can't connect. Disabling EPSV
> PASV
< 227 Entering Passive Mode (10,1,2,102,190,42)
*   Trying 10.1.2.102... connected
* Connecting to 10.1.2.102 (10.1.2.102) port 48682
> TYPE I
< 200 Switching to Binary mode.
> SIZE testfile.txt
< 213 4
> RETR testfile.txt
< 150 Opening BINARY mode data connection for testfile.txt (4 bytes).
* Maxdownload = -1
* Getting file with size: 4
{ [data not shown]
* Remembering we are in dir "path/to/"
< 226 File send OK.
Disabling EPSV

I googled and found that some servers have a problem using Extended Passive Mode (EPSV). To disable Extended Passive Mode, use the --disable-epsv option. With this option, the download took 0.046 seconds.

$ time curl --disable-epsv -o testfile.txt -u myusername:mypassword ftp://ftp.myserver.com/path/to/testfile.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100     4  100     4    0     0    112      0 --:--:-- --:--:-- --:--:--   133

real    0m0.046s
user    0m0.010s
sys     0m0.000s
Disabling EPSV with libcurl

Set CURLOPT_FTP_USE_EPSV to 0. See http://curl.haxx.se/libcurl/c/curl_easy_setopt.html.

Disabling EPSV with pycurl
import pycurl

c = pycurl.Curl()
c.setopt(c.FTP_USE_EPSV, 0)

Some more python recursion examples

I had a number of yaml files that contained passwords that needed encrypting. After parsing the yaml file with pyyaml, the data looked something like this:

EXAMPLE_DATA = {
    'jobs': [{'frequency': '* * * * *',
              'jobconfig': [{'config': [('*',
                                         {'maxspeed': 1048576,
                                          'password': 'onesecretpassword',
                                          'port': 22,
                                          'url': 'basset://basset1.domain.com/tootsiepop/123.csv',
                                          'username': 'myusername'})],
                             'hasbro': 'basset'},
                            {'config': [('*',
                                         {'field_delim': ',',
                                          'field_line': True,
                                          'no_blanks': True,
                                          'quote_char': '"'})],
                             'hasbro': 'pen'},
                            {'config': [('*',
                                         {'db_database': 'mydatabase',
                                          'db_host': 'myhost',
                                          'db_password': 'anothersecretpassword',
                                          'db_table': 'mytable',
                                          'db_user': 'myuser'})],
                             'hasbro': 'dart'}],
              'jobdesc': 'Data from tootsiepop',
              'jobname': 'tootsiepop',
              'max_records_fail': '110%',
              'min_failure_time': '1000y'}],
    'vendor': 'tootsiepop'}

Here is a recursive function that prints all the leaf nodes of my nested data structure.

def print_all_leaf_nodes(data):
    if isinstance(data, dict):
        for item in data.values():
            print_all_leaf_nodes(item)
    elif isinstance(data, list) or isinstance(data, tuple):
        for item in data:
            print_all_leaf_nodes(item)
    else:
        print data

print_all_leaf_nodes(EXAMPLE_DATA)

Results:

tootsiepop
1000y
tootsiepop
*
basset://basset1.domain.com/tootsiepop/123.csv
myusername
onesecretpassword
1048576
22
basset
*
True
"
,
True
pen
*
anothersecretpassword
mytable
myhost
mydatabase
myuser
dart
* * * * *
110%
Data from tootsiepop

Get all leaf nodes

This function returns all leaf nodes as a list instead of printing them. A wrapper function is used to create a Namespace instance to hold the results variable. This could alternatively be stored in a global (module-level) variable. See my notes on variable scope for more info about using a class as a namespace.

def get_all_leaf_nodes(data):
    class Namespace(object):
        pass
    ns = Namespace()
    ns.results = []

    def inner(data):
        if isinstance(data, dict):
            for item in data.values():
                inner(item)
        elif isinstance(data, list) or isinstance(data, tuple):
            for item in data:
                inner(item)
        else:
            ns.results.append(data)

    inner(data)
    return ns.results

from pprint import pprint
pprint(get_all_leaf_nodes(EXAMPLE_DATA))

Results:

['tootsiepop',
 '1000y',
 'tootsiepop',
 '*',
 'basset://basset1.domain.com/tootsiepop/123.csv',
 'myusername',
 'onesecretpassword',
 1048576,
 22,
 'basset',
 '*',
 True,
 '"',
 ',',
 True,
 'pen',
 '*',
 'anothersecretpassword',
 'mytable',
 'myhost',
 'mydatabase',
 'myuser',
 'dart',
 '* * * * *',
 '110%',
 'Data from tootsiepop']

Get all leaf key value pairs

This function gets all key value pairs where values are not compound data structures (i.e. dicts or lists)

def get_all_key_value_pairs_where_values_are_simple(data):
    class Namespace(object):
        pass
    ns = Namespace()
    ns.results = []

    def inner(data):
        if isinstance(data, dict):
            for k, v in data.iteritems():
                if (isinstance(v, dict) or
                    isinstance(v, list) or
                    isinstance(v, tuple)
                    ):
                    inner(v)
                else:
                    ns.results.append((k, v))
        elif isinstance(data, list) or isinstance(data, tuple):
            for item in data:
                inner(item)

    inner(data)
    return ns.results

from pprint import pprint
pprint(get_all_key_value_pairs_where_values_are_simple(EXAMPLE_DATA))

Results:

[('vendor', 'tootsiepop'),
 ('min_failure_time', '1000y'),
 ('jobname', 'tootsiepop'),
 ('url', 'basset://basset1.domain.com/tootsiepop/123.csv'),
 ('username', 'myusername'),
 ('password', 'onesecretpassword'),
 ('maxspeed', 1048576),
 ('port', 22),
 ('hasbro', 'basset'),
 ('field_line', True),
 ('quote_char', '"'),
 ('field_delim', ','),
 ('no_blanks', True),
 ('hasbro', 'pen'),
 ('db_password', 'anothersecretpassword'),
 ('db_table', 'mytable'),
 ('db_host', 'myhost'),
 ('db_database', 'mydatabase'),
 ('db_user', 'myuser'),
 ('hasbro', 'dart'),
 ('frequency', '* * * * *'),
 ('max_records_fail', '110%'),
 ('jobdesc', 'Data from tootsiepop')]

Modify values of terminal key value in a nested dict

This function modifies all the values of all dicts that are not compound data structures (i.e. dicts or lists). The modfn argument is a function that modifies the key value pair. It should accept two arguments: a key and value and it should return the modified value.

The example function, super_secure_encrypt is a function that checks if the string 'password' is in the key, and "encrypts" the value using the <sarcasm>super secure</sarcasm> ROT13 algorithm. (We are actually using the keyczar toolkit from google to do the encryption.)

def modify_all_simple_dict_values(data, modfn):
    if isinstance(data, dict):
        for k, v in data.iteritems():
            if (isinstance(v, dict) or
                isinstance(v, list) or
                isinstance(v, tuple)
                ):
                modify_all_simple_dict_values(v, modfn)
            else:
                data[k] = modfn(k, v)
    elif isinstance(data, list) or isinstance(data, tuple):
        for item in data:
            modify_all_simple_dict_values(item, modfn)

    return data


def super_secure_encrypt(key, value):
    if 'password' in key:
        value = value.encode('rot13')
    return value


from pprint import pprint
pprint(modify_all_simple_dict_values(EXAMPLE_DATA, super_secure_encrypt))

Results:

{'jobs': [{'frequency': '* * * * *',
           'jobconfig': [{'config': [('*',
                                      {'maxspeed': 1048576,
                                       'password': 'barfrpergcnffjbeq',
                                       'port': 22,
                                       'url': 'basset://basset1.domain.com/tootsiepop/123.csv',
                                       'username': 'myusername'})],
                          'hasbro': 'basset'},
                         {'config': [('*',
                                      {'field_delim': ',',
                                       'field_line': True,
                                       'no_blanks': True,
                                       'quote_char': '"'})],
                          'hasbro': 'pen'},
                         {'config': [('*',
                                      {'db_database': 'mydatabase',
                                       'db_host': 'myhost',
                                       'db_password': 'nabgurefrpergcnffjbeq',
                                       'db_table': 'mytable',
                                       'db_user': 'myuser'})],
                          'hasbro': 'dart'}],
           'jobdesc': 'Data from tootsiepop',
           'jobname': 'tootsiepop',
           'max_records_fail': '110%',
           'min_failure_time': '1000y'}],
 'vendor': 'tootsiepop'}

How to remove ^M characters from a file with Python

Use the following Python script to remove ^M (carriage return) characters from your file and replace them with newline characters only. To do this in Emacs, see my notes here.

remove_ctrl_m_chars.py:

import os
import sys
import tempfile


def main():
    filename = sys.argv[1]
    with tempfile.NamedTemporaryFile(delete=False) as fh:
        for line in open(filename):
            line = line.rstrip()
            fh.write(line + '\n')
        os.rename(filename, filename + '.bak')
        os.rename(fh.name, filename)


if __name__ == '__main__':
    main()

Run it

$ python remove_ctrl_m_chars.py myfile.txt 

Documentation