Removing latex commands using Python “re” module

Recently I had to sanitize lines in a .tex file where a \textcolor command had been used.
The command was being used the following way: {\textcolor{some_color}{text to color}}.

The main problem was that the command could have appeared any number of times in a line, so I couldn’t apply the command a set number of times.
Also, given any color could have been used, a simple “blind replace” was clearly not a good weapon in this case.

I therefore resorted to applying a reg ex recursively until the line was cleaned of any \textcolor command.

In a nutshell:

def discolor(line):
    regex = re.compile('(.*?){\textcolor\{.*?\}(\{.*?\})\}(.*)')
    while True:
        try:
            line = ''.join(re.search(regex, line).groups())
        except AttributeError:
            return line

The key part here is that we match not only the text inside the \textcolor command, but also what comes before and after (the two (.*?) blocks). We return them all until there are any left: when that happens, accessing .groups() will raise an AttributeError, which we catch and use as sentinel to know when to return.

Timezones and DST in Python

It’s incredible how fiddly it is to work with timezones.

Today, 14th of June—and this is important—I was trying to convert a made-up datetime from “Europe/London” to UTC.

I instinctively tried out this:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=pytz.timezone('Europe/London'))
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/London' GMT0:00:00 STD>)

At this point you will notice it didn’t take into account the DST offset (it should read BST).

As a further confirmation, converting to UTC keeps the same time:
>>> pytz.UTC.normalize(almostMidnight)
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<UTC>)

Notice this result would be fine during the winter, so depending how much attention you devote and when you write the code you might miss out on this bug – which is why I love having the same suite of tests always running on a system that lives right after the upcoming DST change.

Even more subtler, if you were to try and convert to a different timezone, a geographical timezone that observes DST, you would see this:
>>> almostMidnight.astimezone(pytz.timezone('Europe/Rome'))
datetime.datetime(2017, 6, 15, 1, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/Rome' CEST+2:00:00 DST>)

Interesting. Now DST is accounted for. So converting to geographical timezones might also mask the problem.

Long story short, the correct way *I believe* to convert the timezone of a datetime object to UTC is to create a naive datetime object (no timezone info attached) representing localtime, and then call the “localize” of the timezone of interest. In code:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999)
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999)
>>> pytz.timezone('Europe/London').localize(almostMidnight).astimezone(pytz.UTC)
datetime.datetime(2017, 6, 14, 22, 59, 59, 999999, tzinfo=<UTC>)

There’s a very nice read on timezones by Armin Ronacher, which I recommend.

Remote tail

You might have wondered why I felt the urge to specify “local” in the title of last post. Well, fast forward a few days since then, for a similar set of tests I also needed to check a log file on a remote Linux machine – that is, I needed some kind of remote tail.

ssh tail -f

We already know select will be part of our tool set. On top of that, we’ll need to forward the command across the network – and a nice way of doing that is over SSH. In Python, this task is relatively simple if you choose to use paramiko, a third party library implementing the SSHv2 protocol.

A few caveats here, as well. The following snippet is a raw prototype to demonstrate the functionality. It fit my bills, but YMMV. Of course many aspects can be improved, starting from instance with isBeginningOfMessage, which is much better placed in a derived class, so that different BOM patterns can be handled. Closing the SSH channel cleanly is also something you might want to polish before using this class.

import paramiko
import re
import select
import Queue


class SSHTail(object):
    """Tail a remote file and store new messages in a queue
    """
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    TIMEOUT = 1000  # milliseconds
    BUF_SIZE = 1024
    NEWLINE_CHARS = {'\n', '\r'}

    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.poller = select.poll()
        self.messageQueue = Queue.deque()

    def start(self):
        """Start the tail command and return the queue used to
        store read messages
        """
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(self.host)
        self.client = client

        transport = self.client.get_transport()
        transport.set_keepalive(1)
        self.transport = transport

        channel = self.transport.open_session()
        channel.exec_command("tail -F %s" % self.path)
        self.channel = channel

        self.poller.register(self.channel, self.READ_ONLY)

        return self.messageQueue

    @staticmethod
    def isBeginningOfMessage(line):
        """Return True if the line starts with the hardcoded Beginning of
        Message pattern
        """
        BOMPattern = ''
        return re.match(BOMPattern, line)

    def loop(self):
        """Whilst the SSH tunnel is active, keep polling for new
        content and call parseBuffer() to parse it into messages
        """
        while self.transport.is_active():
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                if flag & (select.POLLIN | select.POLLPRI):
                    buf = self.channel.recv(self.BUF_SIZE)
                    self.parseBuffer(buf)

    def parseBuffer(self, buf):
        """Given a buffer buf, split it into messages and glue it together to
        previous messages, if buf is not the beginning of a message.
        
        Note: assumes each message is on its on line.
        """
        if buf:
            messages = buf.splitlines()

            oldest = messages[0]
            if not self.isBeginningOfMessage(oldest):
                try:
                    messages[0] = self.messageQueue.popleft() + oldest
                except IndexError:
                    pass

            for message in messages:
                self.messageQueue.appendleft(message)

Local tail

tail -f

A while ago I needed, for one of my tests, to monitor a log file on a Linux system and store any new lines, so that I could access the added content at the end of the test. In a sense, I needed a kind of buffered tail -f on a local file.

A quick search led me to the select module.
Without further ado, here’s the code to watch one or more files, and to store anything added to those files in a message queue.

It’s a quick and dirty version which can be improved in many ways. For starters, the keys to access the message queues are the sockets themselves, pretty useless in general, but good enough in my case. Second, notice the file is never closed explicitly: definitely not ideal.

import Queue
import select


class Watcher(object):
    TIMEOUT = 1000
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR

    def __init__(self):
        """Initialize the Watcher"""
        self.poller = select.poll()

        self.fd_to_socket = {}
        self.message_queues = {}

    def addFile(self, path):
        """Add a file to monitor.
        :path: absolute path of the file, including the filename
        """
        f = open(path)
        self.poller.register(f, self.READ_ONLY)
        self.fd_to_socket[f.fileno()] = f 
        self.message_queues[f] = Queue.deque()

    def start(self):
        """Start polling files"""
        while True:
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                s = self.fd_to_socket[fd]
                if flag & (select.POLLIN | select.POLLPRI):
                    lines = s.readlines()
                    if lines:
                        self.message_queues[s].appendleft(*lines)

Append an item to an OrderedDict

I needed a way to append an item to an OrderedDict, without creating a new object (too consuming) and I stumbled upon this answer on StackOverflow.

The answer gives a solution to the inverse problem (that is, prepending an item), but was good enough to be modified for my situation, without me needing to delve too much into the details of the OrderedDict data structure (it’s basically a linked list, under the hood).

Enough said, here it is for future reference:

class MyOrderedDict(OrderedDict):
    def append(self, key, value):

    root = self._OrderedDict__root
    last = root[0]

    if key in self:
        raise KeyError
    else:
        root[0] = last[1] = self._OrderedDict__map[key] = [last, root, key]
        dict.__setitem__(self, key, value)

A web-app to give you your next bus arrival time

404 bus. Actually found!

You go to the bus stop and, by Murphy’s Law, either a) the bus has just gone or b) you’ll wait an endless amount of time for the bus to arrive.
Or at least this is what I experience most of the time.

That’s why I coded a small web-app that uses the TFL live stream data and HTML5 Geolocalization functionality to retrieve a list of the bus stops within 300 m from your location and the list of busses that should arrive from that moment on. And if it can’t find your location, you’re given the possibility to input a postcode, which, thanks to the data at OrdnanceSurvey and some good math, is converted into canonical coordinates and used to present with a similar result.

It was a very nice excuse for me to learn more about Flask (amazingly fast to use), HTML, CSS (first time I use Bootstrap!) and JS, and web-apps in general.

The solution is far from complete and several could be the improvements (handle errors with nice landing pages, allow the user to input a postcode regardless if geolocalization worked or not, add link to maps, …), but as it was more of an experiment, I’m happy with the result so far. It’s hosted at OpenShiftGive it a spin and check out the code.

Decoding RM4SCC for fun

I recently got curious about the bar code I could sometimes found on letters directed at me. I noticed there are just 4 symbols, begins and ends always in the same way, and is the same on all letters, regardless of the sender.

Armed with this basic information, after a bit of research I found out that the code is called Royal Mail 4-State Customer Code. Even more curious, I decided to write a simple decoder for it and, all of a sudden, all the knowledge in signal processing and telecommunications system retuned vivid in my mind, years after I took those classes (which I very much enjoyed, I must admit). Here is how I did it.

TL;DR: I put the code for the rm4sccdec (RM4SCC decoder) on GitHub. Use it at your own risk, as it’s not production-ready and needs some tweaking to reliably scan all types of image. I’ve used Python with OpenCV and numpy.

Step one: image pre-processing

The code does not include any information in the colour, which means we can simply get rid of the colour information and transform the image to greyscale.

Next, we want to maximise the “distance” between the information (the bars) and the noise (the background): this is usually done by thresholding the image. Using a global value for thresholding does not always give good results, especially when different areas of the image are characterised by different illumination. Some more advanced techniques, such as Otsu thresholding method (which I used in my decoder), are a better fit.

Finally, it is possible to have some residual noise, due to the thresholding process, whereby some white pixels are present in black areas and vice-versa. This is called salt’n’pepper noise and can effectively be filtered with median filters, which substitute the value of a pixel with the median of those around. The great advantage is that it preserves the edges of the image.

Step two: feature selection, extraction, and classification

Now we can start thinking about the features defining our symbols. We know we have 4 symbols, which we can call ascenderdescenderlong (Full Height, in the image), and short (Tracker, in the image).

The 4 symbols used in RM4SCC, from Wikipedia The 4 symbols used in RM4SCC, from Wikipedia

The first obvious feature we can select is the vertical position of each bar. After all, that’s the information we need to decode the codeword. However, if we choose the 4 points determining each bar, we’d probably end up complicating the decoding process too much.

An easy way out is to choose the centroid position (just its y-coordinate would be necessary) for each bar. Notice that, though, the long and short bars will share the same feature. If we go along this path, we need another feature to distinguish (at least) the long bar from the short bar. The second obvious feature is therefore the size or, more accurately, the area. This feature will allow us to distinguish easily long from short, but it will be pretty useless for the ascender and the descender.

For the extraction, we need to segment the image and find all the bars, and compute the so-called moments for each of them. The first three moments will be enough for us to get all the features we are interested in.

As a side note, as the segmentation function I have used does not return all segments in order, I had to extract the x-coordinate for each bar so as to be able and sort the vector of symbols.

If the code scanned is reasonably horizontal, we should be able to classify all four symbols pretty easily. For this bit I resorted to K-means clustering, although other classification methods can be used with similar results.

Step three: the actual decoding

If we don’t consider the starting and ending symbols, all symbols inbetween are grouped 4 by 4. For this reason we first need to build a dictionary that maps all valid combinations of 4-symbols group to the correct letter or number.

Finally, a bit of fun when computing the checksum. I translated the algorithm explained here, with the only difference that I wanted to avoid using yet another table to compute the final letter/number so instead I implemented the rule behind it (which boils down to ensuring ‘bit parity’).

Step four: enjoy it!

And possibly fork, improve and re-release :)

Asserting that an exception has not been raised

A quick way to test that some method does not raise an exception is to try and raise that exception. Sounds logic – and it is – but I keep forgetting it. So here it is for my future me.

Suppose you have a method that raises an exception if it can’t open a file.

def read_first_line(self, filename):
    try:
        with open(filename, 'r') as f:
            self.first_line = f.readline()
    except IOError:
        print("No such file {0}".format(filename), file=sys.stderr)

Testing that it works correctly when a file does exist is quite simple at this point:

def test_no_exception_is_raised_if_file_exists(self):
    # suppose your instance is in self.sut
    try:
       sut.read_first_line('file_that_exists.txt')
    except IOError:
        self.fail("IOError raised but should not have")

Pdb cheat sheet

I often need a table with all the commands for the Python debugger (Pdb) but so far I couldn’t find a comprehensive one. So I wrote one. Enjoy!

h(elp) or ? print available commands
h(elp) command print help about command
q(uit) quit debugger
! stmt or exec stmt execute the one-line statement stmt
p(rint) expr print the value of expression
pp expr pretty-print the value of expression
whatis obj print the type of obj
bt or w(here) print stack trace from oldest frame to newest
l(ist) list 11 lines of source code around the current line
l m, n list from line m to line n
args print the arguments of the current function
alias [name [command [parameter parameter ...] ]] create an alias called name that executes command
unalias name remove alias
<ENTER> repeat the last command entered
n(ext) execute the current statement (step over)
s(tep) step into function
j(ump) n set line n the next line to be executed
r(eturn) continue execution until the current function returns
c(ontinue) continue execution until a breakpoints is encountered
unt(il) continue execution until reaching the line greater than the current one or the current frame returns
run or restart (re-)run the program
u(p) move one level up in the stack trace
d(own) move one level down in the stack trace
b(reak) show breakpoints
tb(reak) [filename:]n set a temporary breakpoint at line n of file filename
b [filename:]n set a breakpoint at line n of file filename
b fun set a breakpoint at function fun
disable bn1 [bn2, ...] disable breakpoint(s) bn1, bn2, …
enable bn1 [bn2, ...] enable breakpoint(s) bn1, bn2, …
clear [bn] clear breakpoint number bn; if no number is specified, clear them all
commands [bn] specify commands to run at breakpoint bn
condition bn expr expression expr must evaluate to true in order for breakpoint bn to be hit. If no expression is specified, the breakpoint is made unconditional