Navigate the stack backward

Took me a bit to figure it out correctly, so here it is: a recursive generator to navigate the stack backward (upward, if you want)

def getUpperFrames(frame):
    yield frame
    if not hasattr(frame, 'f_back'):
        return
    for frame in getUpperFrames(frame.f_back):
        yield frame

And if you’re on Python 3 – which you should – then you get the bonus of using yield from instead of using for ... yield

Advertisements

Create a valid ISIN

For a suite of tests I recently wrote, I had to create valid ISINs. The official page doesn’t give away much on how the final checksum digit should be computed and the examples on Wikipedia are, in my opinion, not particularly clear.

While I was trying to understand more, I stumbled upon a related unanswered question on stack overflow, which was basically after what I was trying to do, so I took the time to answer it.

Here I report a slightly modified version of the code snippet linked above, to create the checksum digit for the first 11 characters of an ISIN.

import string

def digit_sum(n):
    return (n // 10) + (n % 10)

alphabet = {letter: value for (value, letter) in
            enumerate(''.join(str(n) for n in range(10)) + string.uppercase)}

def isinChecksumDigit(isin):
    isin_to_digits = ''.join(str(d) for d in (alphabet[v] for v in isin))
    isin_sum = 0
    for (i, c) in enumerate(reversed(isin_to_digits), 1):
        if i % 2 == 1:
            isin_sum += digit_sum(2*int(c))
        else:
            isin_sum += int(c)

    checksum_digit = abs(- isin_sum % 10)
    return checksum_digit

Assuming countries is a list of valid, i.e. ISO-6166 compliant, country codes, you call the isinChecksumDigit as follows:

In [1]: isin = '{:2s}{:09d}'.format(random.choice(countries), random.randint(1, 10E8))

In [2]: isin
Out[2]: 'KR681111517'

In [3]: validIsin = isin + str(isinChecksumDigit(isin))

In [4]: validIsin
Out[4]: 'KR6811115171

Unpickling GitPython datetimes

Pickles

I’ve been playing around with GitPython recently, in an effort to analyse the relation between commits and software quality.

One by-product of this analysis was a Pandas Series of the number of commits on a given day. Since this turned out to be a time-consuming operation (as I needed to repoint head back in time for each day I was interested in), I opted to pickle the Series. Imagine the horror when, the day after I had run the script, I discovered that unpickling the data raised an exception.

In [4]: commits = pd.read_pickle('commits.pkl')
...
TypeError: __init__() takes at least 2 arguments (1 given)

That error comes from pickle_compat.py, part of the Pandas library.
However, no mention is made of which class actually raised it.

Entering %debug and going up and down the stack didn’t reveal much either, so I decided to go closer to the actual unpickling operation, using cPickle.

In [10]: commits = cPickle.load(open('commits.pkl'))
...
TypeError: ('__init__() takes at least 2 arguments (1 given)', <class 'git.objects.util.tzoffset'>, ())

Still an error, but a more meaningful one. Let’s see what a brief inspection of tzoffset shows.

In [11]: import git.objects.util

In [12]: git.objects.util.tzoffset?
Init signature: git.objects.util.tzoffset(self, secs_west_of_utc, name=None)
Docstring:
File: /opt/bats/lib/python2.7/site-packages/git/objects/util.py
Type: type

So __init__ expects a secs_west_of_utc positional argument (no default).

To still be able to unpickle your data without the need for running the script again, you just need to mock that class with a slightly modified one. Thank partial applications for that.

In [20]: git.objects.util.tzoffset = partial(git.objects.util.tzoffset, secs_west_of_utc=0)
In [21]: commits = pickle.load(open('commits.pkl'))
In [22]:

Job done – thank you functools!

The connection string of a SQLAlchemy connection

In the middle of a Pdb session, while debugging a test, I found myself with a SQLAlchemy connection object, which was connected to… some database. To figure out which database it was connected to, I could scan the code to see where the connection had been initialised.

However, there’s a quicker way: looking at the _dsn variable of the underlying connection object – DSN standing for Data Source Name.

(Pdb) p conn.connection._dsn
'host=thehostname dbname=master_db_02 user=the_usr password=the_password connect_timeout=5 application_name=/usr/bin/nosetests'

Removing latex commands using Python “re” module

Recently I had to sanitize lines in a .tex file where a \textcolor command had been used.
The command was being used the following way: {\textcolor{some_color}{text to color}}.

The main problem was that the command could have appeared any number of times in a line, so I couldn’t apply the command a set number of times.
Also, given any color could have been used, a simple “blind replace” was clearly not a good weapon in this case.

I therefore resorted to applying a reg ex recursively until the line was cleaned of any \textcolor command.

In a nutshell:

def discolor(line):
    regex = re.compile('(.*?){\textcolor\{.*?\}(\{.*?\})\}(.*)')
    while True:
        try:
            line = ''.join(re.search(regex, line).groups())
        except AttributeError:
            return line

The key part here is that we match not only the text inside the \textcolor command, but also what comes before and after (the two (.*?) blocks). We return them all until there are any left: when that happens, accessing .groups() will raise an AttributeError, which we catch and use as sentinel to know when to return.

Timezones and DST in Python

It’s incredible how fiddly it is to work with timezones.

Today, 14th of June—and this is important—I was trying to convert a made-up datetime from “Europe/London” to UTC.

I instinctively tried out this:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=pytz.timezone('Europe/London'))
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/London' GMT0:00:00 STD>)

At this point you will notice it didn’t take into account the DST offset (it should read BST).

As a further confirmation, converting to UTC keeps the same time:
>>> pytz.UTC.normalize(almostMidnight)
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<UTC>)

Notice this result would be fine during the winter, so depending how much attention you devote and when you write the code you might miss out on this bug – which is why I love having the same suite of tests always running on a system that lives right after the upcoming DST change.

Even more subtler, if you were to try and convert to a different timezone, a geographical timezone that observes DST, you would see this:
>>> almostMidnight.astimezone(pytz.timezone('Europe/Rome'))
datetime.datetime(2017, 6, 15, 1, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/Rome' CEST+2:00:00 DST>)

Interesting. Now DST is accounted for. So converting to geographical timezones might also mask the problem.

Long story short, the correct way *I believe* to convert the timezone of a datetime object to UTC is to create a naive datetime object (no timezone info attached) representing localtime, and then call the “localize” of the timezone of interest. In code:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999)
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999)
>>> pytz.timezone('Europe/London').localize(almostMidnight).astimezone(pytz.UTC)
datetime.datetime(2017, 6, 14, 22, 59, 59, 999999, tzinfo=<UTC>)

There’s a very nice read on timezones by Armin Ronacher, which I recommend.

Remote tail

You might have wondered why I felt the urge to specify “local” in the title of last post. Well, fast forward a few days since then, for a similar set of tests I also needed to check a log file on a remote Linux machine – that is, I needed some kind of remote tail.

ssh tail -f

We already know select will be part of our tool set. On top of that, we’ll need to forward the command across the network – and a nice way of doing that is over SSH. In Python, this task is relatively simple if you choose to use paramiko, a third party library implementing the SSHv2 protocol.

A few caveats here, as well. The following snippet is a raw prototype to demonstrate the functionality. It fit my bills, but YMMV. Of course many aspects can be improved, starting from instance with isBeginningOfMessage, which is much better placed in a derived class, so that different BOM patterns can be handled. Closing the SSH channel cleanly is also something you might want to polish before using this class.

import paramiko
import re
import select
import Queue


class SSHTail(object):
    """Tail a remote file and store new messages in a queue
    """
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    TIMEOUT = 1000  # milliseconds
    BUF_SIZE = 1024
    NEWLINE_CHARS = {'\n', '\r'}

    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.poller = select.poll()
        self.messageQueue = Queue.deque()

    def start(self):
        """Start the tail command and return the queue used to
        store read messages
        """
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(self.host)
        self.client = client

        transport = self.client.get_transport()
        transport.set_keepalive(1)
        self.transport = transport

        channel = self.transport.open_session()
        channel.exec_command("tail -F %s" % self.path)
        self.channel = channel

        self.poller.register(self.channel, self.READ_ONLY)

        return self.messageQueue

    @staticmethod
    def isBeginningOfMessage(line):
        """Return True if the line starts with the hardcoded Beginning of
        Message pattern
        """
        BOMPattern = ''
        return re.match(BOMPattern, line)

    def loop(self):
        """Whilst the SSH tunnel is active, keep polling for new
        content and call parseBuffer() to parse it into messages
        """
        while self.transport.is_active():
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                if flag & (select.POLLIN | select.POLLPRI):
                    buf = self.channel.recv(self.BUF_SIZE)
                    self.parseBuffer(buf)

    def parseBuffer(self, buf):
        """Given a buffer buf, split it into messages and glue it together to
        previous messages, if buf is not the beginning of a message.
        
        Note: assumes each message is on its on line.
        """
        if buf:
            messages = buf.splitlines()

            oldest = messages[0]
            if not self.isBeginningOfMessage(oldest):
                try:
                    messages[0] = self.messageQueue.popleft() + oldest
                except IndexError:
                    pass

            for message in messages:
                self.messageQueue.appendleft(message)

Local tail

tail -f

A while ago I needed, for one of my tests, to monitor a log file on a Linux system and store any new lines, so that I could access the added content at the end of the test. In a sense, I needed a kind of buffered tail -f on a local file.

A quick search led me to the select module.
Without further ado, here’s the code to watch one or more files, and to store anything added to those files in a message queue.

It’s a quick and dirty version which can be improved in many ways. For starters, the keys to access the message queues are the sockets themselves, pretty useless in general, but good enough in my case. Second, notice the file is never closed explicitly: definitely not ideal.

import Queue
import select


class Watcher(object):
    TIMEOUT = 1000
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR

    def __init__(self):
        """Initialize the Watcher"""
        self.poller = select.poll()

        self.fd_to_socket = {}
        self.message_queues = {}

    def addFile(self, path):
        """Add a file to monitor.
        :path: absolute path of the file, including the filename
        """
        f = open(path)
        self.poller.register(f, self.READ_ONLY)
        self.fd_to_socket[f.fileno()] = f 
        self.message_queues[f] = Queue.deque()

    def start(self):
        """Start polling files"""
        while True:
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                s = self.fd_to_socket[fd]
                if flag & (select.POLLIN | select.POLLPRI):
                    lines = s.readlines()
                    if lines:
                        self.message_queues[s].appendleft(*lines)

Append an item to an OrderedDict

Update 2017/12/07. Since it seems this post is pretty popular… Beware: the following snippet abuses accessing private fields and, more in general, relies on the internal details of another data structure. I don’t recommend using this approach in any code you rely on. OK for learning and investigating OrderedDict‘s internals, not OK for prod. Use at your own risk.

I needed a way to append an item to an OrderedDict, without creating a new object (too consuming) and I stumbled upon this answer on StackOverflow.

The answer gives a solution to the inverse problem (that is, prepending an item), but was good enough to be modified for my situation, without me needing to delve too much into the details of the OrderedDict data structure (it’s basically a linked list, under the hood).

Enough said, here it is for future reference:

class MyOrderedDict(OrderedDict):
    def append(self, key, value):
        root = self._OrderedDict__root
        last = root[0]

        if key in self:
            raise KeyError
        else:
            root[0] = last[1] = self._OrderedDict__map[key] = [last, root, key]
            dict.__setitem__(self, key, value)