Unpickling GitPython datetimes


I’ve been playing around with GitPython recently, in an effort to analyse the relation between commits and software quality.

One by-product of this analysis was a Pandas Series of the number of commits on a given day. Since this turned out to be a time-consuming operation (as I needed to repoint head back in time for each day I was interested in), I opted to pickle the Series. Imagine the horror when, the day after I had run the script, I discovered that unpickling the data raised an exception.

In [4]: commits = pd.read_pickle('commits.pkl')
TypeError: __init__() takes at least 2 arguments (1 given)

That error comes from pickle_compat.py, part of the Pandas library.
However, no mention is made of which class actually raised it.

Entering %debug and going up and down the stack didn’t reveal much either, so I decided to go closer to the actual unpickling operation, using cPickle.

In [10]: commits = cPickle.load(open('commits.pkl'))
TypeError: ('__init__() takes at least 2 arguments (1 given)', <class 'git.objects.util.tzoffset'>, ())

Still an error, but a more meaningful one. Let’s see what a brief inspection of tzoffset shows.

In [11]: import git.objects.util

In [12]: git.objects.util.tzoffset?
Init signature: git.objects.util.tzoffset(self, secs_west_of_utc, name=None)
File: /opt/bats/lib/python2.7/site-packages/git/objects/util.py
Type: type

So __init__ expects a secs_west_of_utc positional argument (no default).

To still be able to unpickle your data without the need for running the script again, you just need to mock that class with a slightly modified one. Thank partial applications for that.

In [20]: git.objects.util.tzoffset = partial(git.objects.util.tzoffset, secs_west_of_utc=0)
In [21]: commits = pickle.load(open('commits.pkl'))
In [22]:

Job done – thank you functools!


The connection string of a SQLAlchemy connection

In the middle of a Pdb session, while debugging a test, I found myself with a SQLAlchemy connection object, which was connected to… some database. To figure out which database it was connected to, I could scan the code to see where the connection had been initialised.

However, there’s a quicker way: looking at the _dsn variable of the underlying connection object – DSN standing for Data Source Name.

(Pdb) p conn.connection._dsn
'host=thehostname dbname=master_db_02 user=the_usr password=the_password connect_timeout=5 application_name=/usr/bin/nosetests'

Removing latex commands using Python “re” module

Recently I had to sanitize lines in a .tex file where a \textcolor command had been used.
The command was being used the following way: {\textcolor{some_color}{text to color}}.

The main problem was that the command could have appeared any number of times in a line, so I couldn’t apply the command a set number of times.
Also, given any color could have been used, a simple “blind replace” was clearly not a good weapon in this case.

I therefore resorted to applying a reg ex recursively until the line was cleaned of any \textcolor command.

In a nutshell:

def discolor(line):
    regex = re.compile('(.*?){\textcolor\{.*?\}(\{.*?\})\}(.*)')
    while True:
            line = ''.join(re.search(regex, line).groups())
        except AttributeError:
            return line

The key part here is that we match not only the text inside the \textcolor command, but also what comes before and after (the two (.*?) blocks). We return them all until there are any left: when that happens, accessing .groups() will raise an AttributeError, which we catch and use as sentinel to know when to return.

Timezones and DST in Python

It’s incredible how fiddly it is to work with timezones.

Today, 14th of June—and this is important—I was trying to convert a made-up datetime from “Europe/London” to UTC.

I instinctively tried out this:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=pytz.timezone('Europe/London'))
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/London' GMT0:00:00 STD>)

At this point you will notice it didn’t take into account the DST offset (it should read BST).

As a further confirmation, converting to UTC keeps the same time:
>>> pytz.UTC.normalize(almostMidnight)
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<UTC>)

Notice this result would be fine during the winter, so depending how much attention you devote and when you write the code you might miss out on this bug – which is why I love having the same suite of tests always running on a system that lives right after the upcoming DST change.

Even more subtler, if you were to try and convert to a different timezone, a geographical timezone that observes DST, you would see this:
>>> almostMidnight.astimezone(pytz.timezone('Europe/Rome'))
datetime.datetime(2017, 6, 15, 1, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/Rome' CEST+2:00:00 DST>)

Interesting. Now DST is accounted for. So converting to geographical timezones might also mask the problem.

Long story short, the correct way *I believe* to convert the timezone of a datetime object to UTC is to create a naive datetime object (no timezone info attached) representing localtime, and then call the “localize” of the timezone of interest. In code:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999)
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999)
>>> pytz.timezone('Europe/London').localize(almostMidnight).astimezone(pytz.UTC)
datetime.datetime(2017, 6, 14, 22, 59, 59, 999999, tzinfo=<UTC>)

There’s a very nice read on timezones by Armin Ronacher, which I recommend.

Remote tail

You might have wondered why I felt the urge to specify “local” in the title of last post. Well, fast forward a few days since then, for a similar set of tests I also needed to check a log file on a remote Linux machine – that is, I needed some kind of remote tail.

ssh tail -f

We already know select will be part of our tool set. On top of that, we’ll need to forward the command across the network – and a nice way of doing that is over SSH. In Python, this task is relatively simple if you choose to use paramiko, a third party library implementing the SSHv2 protocol.

A few caveats here, as well. The following snippet is a raw prototype to demonstrate the functionality. It fit my bills, but YMMV. Of course many aspects can be improved, starting from instance with isBeginningOfMessage, which is much better placed in a derived class, so that different BOM patterns can be handled. Closing the SSH channel cleanly is also something you might want to polish before using this class.

import paramiko
import re
import select
import Queue

class SSHTail(object):
    """Tail a remote file and store new messages in a queue
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    TIMEOUT = 1000  # milliseconds
    BUF_SIZE = 1024
    NEWLINE_CHARS = {'\n', '\r'}

    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.poller = select.poll()
        self.messageQueue = Queue.deque()

    def start(self):
        """Start the tail command and return the queue used to
        store read messages
        client = paramiko.SSHClient()
        self.client = client

        transport = self.client.get_transport()
        self.transport = transport

        channel = self.transport.open_session()
        channel.exec_command("tail -F %s" % self.path)
        self.channel = channel

        self.poller.register(self.channel, self.READ_ONLY)

        return self.messageQueue

    def isBeginningOfMessage(line):
        """Return True if the line starts with the hardcoded Beginning of
        Message pattern
        BOMPattern = ''
        return re.match(BOMPattern, line)

    def loop(self):
        """Whilst the SSH tunnel is active, keep polling for new
        content and call parseBuffer() to parse it into messages
        while self.transport.is_active():
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                if flag & (select.POLLIN | select.POLLPRI):
                    buf = self.channel.recv(self.BUF_SIZE)

    def parseBuffer(self, buf):
        """Given a buffer buf, split it into messages and glue it together to
        previous messages, if buf is not the beginning of a message.
        Note: assumes each message is on its on line.
        if buf:
            messages = buf.splitlines()

            oldest = messages[0]
            if not self.isBeginningOfMessage(oldest):
                    messages[0] = self.messageQueue.popleft() + oldest
                except IndexError:

            for message in messages:

Local tail

tail -f

A while ago I needed, for one of my tests, to monitor a log file on a Linux system and store any new lines, so that I could access the added content at the end of the test. In a sense, I needed a kind of buffered tail -f on a local file.

A quick search led me to the select module.
Without further ado, here’s the code to watch one or more files, and to store anything added to those files in a message queue.

It’s a quick and dirty version which can be improved in many ways. For starters, the keys to access the message queues are the sockets themselves, pretty useless in general, but good enough in my case. Second, notice the file is never closed explicitly: definitely not ideal.

import Queue
import select

class Watcher(object):
    TIMEOUT = 1000
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR

    def __init__(self):
        """Initialize the Watcher"""
        self.poller = select.poll()

        self.fd_to_socket = {}
        self.message_queues = {}

    def addFile(self, path):
        """Add a file to monitor.
        :path: absolute path of the file, including the filename
        f = open(path)
        self.poller.register(f, self.READ_ONLY)
        self.fd_to_socket[f.fileno()] = f 
        self.message_queues[f] = Queue.deque()

    def start(self):
        """Start polling files"""
        while True:
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                s = self.fd_to_socket[fd]
                if flag & (select.POLLIN | select.POLLPRI):
                    lines = s.readlines()
                    if lines:

Append an item to an OrderedDict

I needed a way to append an item to an OrderedDict, without creating a new object (too consuming) and I stumbled upon this answer on StackOverflow.

The answer gives a solution to the inverse problem (that is, prepending an item), but was good enough to be modified for my situation, without me needing to delve too much into the details of the OrderedDict data structure (it’s basically a linked list, under the hood).

Enough said, here it is for future reference:

class MyOrderedDict(OrderedDict):
    def append(self, key, value):

    root = self._OrderedDict__root
    last = root[0]

    if key in self:
        raise KeyError
        root[0] = last[1] = self._OrderedDict__map[key] = [last, root, key]
        dict.__setitem__(self, key, value)

Decoding RM4SCC for fun

I recently got curious about the bar code I could sometimes found on letters directed at me. I noticed there are just 4 symbols, begins and ends always in the same way, and is the same on all letters, regardless of the sender.

Armed with this basic information, after a bit of research I found out that the code is called Royal Mail 4-State Customer Code. Even more curious, I decided to write a simple decoder for it and, all of a sudden, all the knowledge in signal processing and telecommunications system retuned vivid in my mind, years after I took those classes (which I very much enjoyed, I must admit). Here is how I did it.

TL;DR: I put the code for the rm4sccdec (RM4SCC decoder) on GitHub. Use it at your own risk, as it’s not production-ready and needs some tweaking to reliably scan all types of image. I’ve used Python with OpenCV and numpy.

Step one: image pre-processing

The code does not include any information in the colour, which means we can simply get rid of the colour information and transform the image to greyscale.

Next, we want to maximise the “distance” between the information (the bars) and the noise (the background): this is usually done by thresholding the image. Using a global value for thresholding does not always give good results, especially when different areas of the image are characterised by different illumination. Some more advanced techniques, such as Otsu thresholding method (which I used in my decoder), are a better fit.

Finally, it is possible to have some residual noise, due to the thresholding process, whereby some white pixels are present in black areas and vice-versa. This is called salt’n’pepper noise and can effectively be filtered with median filters, which substitute the value of a pixel with the median of those around. The great advantage is that it preserves the edges of the image.

Step two: feature selection, extraction, and classification

Now we can start thinking about the features defining our symbols. We know we have 4 symbols, which we can call ascenderdescenderlong (Full Height, in the image), and short (Tracker, in the image).

The 4 symbols used in RM4SCC, from Wikipedia The 4 symbols used in RM4SCC, from Wikipedia

The first obvious feature we can select is the vertical position of each bar. After all, that’s the information we need to decode the codeword. However, if we choose the 4 points determining each bar, we’d probably end up complicating the decoding process too much.

An easy way out is to choose the centroid position (just its y-coordinate would be necessary) for each bar. Notice that, though, the long and short bars will share the same feature. If we go along this path, we need another feature to distinguish (at least) the long bar from the short bar. The second obvious feature is therefore the size or, more accurately, the area. This feature will allow us to distinguish easily long from short, but it will be pretty useless for the ascender and the descender.

For the extraction, we need to segment the image and find all the bars, and compute the so-called moments for each of them. The first three moments will be enough for us to get all the features we are interested in.

As a side note, as the segmentation function I have used does not return all segments in order, I had to extract the x-coordinate for each bar so as to be able and sort the vector of symbols.

If the code scanned is reasonably horizontal, we should be able to classify all four symbols pretty easily. For this bit I resorted to K-means clustering, although other classification methods can be used with similar results.

Step three: the actual decoding

If we don’t consider the starting and ending symbols, all symbols inbetween are grouped 4 by 4. For this reason we first need to build a dictionary that maps all valid combinations of 4-symbols group to the correct letter or number.

Finally, a bit of fun when computing the checksum. I translated the algorithm explained here, with the only difference that I wanted to avoid using yet another table to compute the final letter/number so instead I implemented the rule behind it (which boils down to ensuring ‘bit parity’).

Step four: enjoy it!

And possibly fork, improve and re-release :)

Misconfigured Python pretty printers in GDB

I noticed that running an executable in gdb displayed an error – and only the first time you run the program. The error reads:

(gdb) r
Starting program: /home/chris/workinprogress/cpp/book/a.out 
Traceback (most recent call last):
  File "/usr/share/gdb/auto-load/usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.19-gdb.py", line 63, in 
    from libstdcxx.v6.printers import register_libstdcxx_printers
ImportError: No module named 'libstdcxx'
[Inferior 1 (process 7215) exited normally]

The problem is, the libstdcxx directory is not in the path.
The very simple fix is to add the directory to the Python path in gdbinit.

$ cat ~/.gdbinit
import sys
sys.path.insert(0, '/usr/share/gcc-4.8/python')
from libstdcxx.v6.printers import register_libstdcxx_printers
register_libstdcxx_printers (None)