Removing latex commands using Python “re” module

Recently I had to sanitize lines in a .tex file where a \textcolor command had been used.
The command was being used the following way: {\textcolor{some_color}{text to color}}.

The main problem was that the command could have appeared any number of times in a line, so I couldn’t apply the command a set number of times.
Also, given any color could have been used, a simple “blind replace” was clearly not a good weapon in this case.

I therefore resorted to applying a reg ex recursively until the line was cleaned of any \textcolor command.

In a nutshell:

def discolor(line):
    regex = re.compile('(.*?){\textcolor\{.*?\}(\{.*?\})\}(.*)')
    while True:
        try:
            line = ''.join(re.search(regex, line).groups())
        except AttributeError:
            return line

The key part here is that we match not only the text inside the \textcolor command, but also what comes before and after (the two (.*?) blocks). We return them all until there are any left: when that happens, accessing .groups() will raise an AttributeError, which we catch and use as sentinel to know when to return.

Timezones and DST in Python

It’s incredible how fiddly it is to work with timezones.

Today, 14th of June—and this is important—I was trying to convert a made-up datetime from “Europe/London” to UTC.

I instinctively tried out this:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=pytz.timezone('Europe/London'))
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/London' GMT0:00:00 STD>)

At this point you will notice it didn’t take into account the DST offset (it should read BST).

As a further confirmation, converting to UTC keeps the same time:
>>> pytz.UTC.normalize(almostMidnight)
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999, tzinfo=<UTC>)

Notice this result would be fine during the winter, so depending how much attention you devote and when you write the code you might miss out on this bug – which is why I love having the same suite of tests always running on a system that lives right after the upcoming DST change.

Even more subtler, if you were to try and convert to a different timezone, a geographical timezone that observes DST, you would see this:
>>> almostMidnight.astimezone(pytz.timezone('Europe/Rome'))
datetime.datetime(2017, 6, 15, 1, 59, 59, 999999, tzinfo=<DstTzInfo 'Europe/Rome' CEST+2:00:00 DST>)

Interesting. Now DST is accounted for. So converting to geographical timezones might also mask the problem.

Long story short, the correct way *I believe* to convert the timezone of a datetime object to UTC is to create a naive datetime object (no timezone info attached) representing localtime, and then call the “localize” of the timezone of interest. In code:
>>> almostMidnight = datetime.now().replace(hour=23, minute=59, second=59, microsecond=999999)
>>> almostMidnight
datetime.datetime(2017, 6, 14, 23, 59, 59, 999999)
>>> pytz.timezone('Europe/London').localize(almostMidnight).astimezone(pytz.UTC)
datetime.datetime(2017, 6, 14, 22, 59, 59, 999999, tzinfo=<UTC>)

There’s a very nice read on timezones by Armin Ronacher, which I recommend.

Remote tail

You might have wondered why I felt the urge to specify “local” in the title of last post. Well, fast forward a few days since then, for a similar set of tests I also needed to check a log file on a remote Linux machine – that is, I needed some kind of remote tail.

ssh tail -f

We already know select will be part of our tool set. On top of that, we’ll need to forward the command across the network – and a nice way of doing that is over SSH. In Python, this task is relatively simple if you choose to use paramiko, a third party library implementing the SSHv2 protocol.

A few caveats here, as well. The following snippet is a raw prototype to demonstrate the functionality. It fit my bills, but YMMV. Of course many aspects can be improved, starting from instance with isBeginningOfMessage, which is much better placed in a derived class, so that different BOM patterns can be handled. Closing the SSH channel cleanly is also something you might want to polish before using this class.

import paramiko
import re
import select
import Queue


class SSHTail(object):
    """Tail a remote file and store new messages in a queue
    """
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
    TIMEOUT = 1000  # milliseconds
    BUF_SIZE = 1024
    NEWLINE_CHARS = {'\n', '\r'}

    def __init__(self, host, path):
        self.host = host
        self.path = path
        self.poller = select.poll()
        self.messageQueue = Queue.deque()

    def start(self):
        """Start the tail command and return the queue used to
        store read messages
        """
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(self.host)
        self.client = client

        transport = self.client.get_transport()
        transport.set_keepalive(1)
        self.transport = transport

        channel = self.transport.open_session()
        channel.exec_command("tail -F %s" % self.path)
        self.channel = channel

        self.poller.register(self.channel, self.READ_ONLY)

        return self.messageQueue

    @staticmethod
    def isBeginningOfMessage(line):
        """Return True if the line starts with the hardcoded Beginning of
        Message pattern
        """
        BOMPattern = ''
        return re.match(BOMPattern, line)

    def loop(self):
        """Whilst the SSH tunnel is active, keep polling for new
        content and call parseBuffer() to parse it into messages
        """
        while self.transport.is_active():
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                if flag & (select.POLLIN | select.POLLPRI):
                    buf = self.channel.recv(self.BUF_SIZE)
                    self.parseBuffer(buf)

    def parseBuffer(self, buf):
        """Given a buffer buf, split it into messages and glue it together to
        previous messages, if buf is not the beginning of a message.
        
        Note: assumes each message is on its on line.
        """
        if buf:
            messages = buf.splitlines()

            oldest = messages[0]
            if not self.isBeginningOfMessage(oldest):
                try:
                    messages[0] = self.messageQueue.popleft() + oldest
                except IndexError:
                    pass

            for message in messages:
                self.messageQueue.appendleft(message)

Local tail

tail -f

A while ago I needed, for one of my tests, to monitor a log file on a Linux system and store any new lines, so that I could access the added content at the end of the test. In a sense, I needed a kind of buffered tail -f on a local file.

A quick search led me to the select module.
Without further ado, here’s the code to watch one or more files, and to store anything added to those files in a message queue.

It’s a quick and dirty version which can be improved in many ways. For starters, the keys to access the message queues are the sockets themselves, pretty useless in general, but good enough in my case. Second, notice the file is never closed explicitly: definitely not ideal.

import Queue
import select


class Watcher(object):
    TIMEOUT = 1000
    READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR

    def __init__(self):
        """Initialize the Watcher"""
        self.poller = select.poll()

        self.fd_to_socket = {}
        self.message_queues = {}

    def addFile(self, path):
        """Add a file to monitor.
        :path: absolute path of the file, including the filename
        """
        f = open(path)
        self.poller.register(f, self.READ_ONLY)
        self.fd_to_socket[f.fileno()] = f 
        self.message_queues[f] = Queue.deque()

    def start(self):
        """Start polling files"""
        while True:
            events = self.poller.poll(self.TIMEOUT)
            for fd, flag in events:
                s = self.fd_to_socket[fd]
                if flag & (select.POLLIN | select.POLLPRI):
                    lines = s.readlines()
                    if lines:
                        self.message_queues[s].appendleft(*lines)

Append an item to an OrderedDict

I needed a way to append an item to an OrderedDict, without creating a new object (too consuming) and I stumbled upon this answer on StackOverflow.

The answer gives a solution to the inverse problem (that is, prepending an item), but was good enough to be modified for my situation, without me needing to delve too much into the details of the OrderedDict data structure (it’s basically a linked list, under the hood).

Enough said, here it is for future reference:

class MyOrderedDict(OrderedDict):
    def append(self, key, value):

    root = self._OrderedDict__root
    last = root[0]

    if key in self:
        raise KeyError
    else:
        root[0] = last[1] = self._OrderedDict__map[key] = [last, root, key]
        dict.__setitem__(self, key, value)

Decoding RM4SCC for fun

I recently got curious about the bar code I could sometimes found on letters directed at me. I noticed there are just 4 symbols, begins and ends always in the same way, and is the same on all letters, regardless of the sender.

Armed with this basic information, after a bit of research I found out that the code is called Royal Mail 4-State Customer Code. Even more curious, I decided to write a simple decoder for it and, all of a sudden, all the knowledge in signal processing and telecommunications system retuned vivid in my mind, years after I took those classes (which I very much enjoyed, I must admit). Here is how I did it.

TL;DR: I put the code for the rm4sccdec (RM4SCC decoder) on GitHub. Use it at your own risk, as it’s not production-ready and needs some tweaking to reliably scan all types of image. I’ve used Python with OpenCV and numpy.

Step one: image pre-processing

The code does not include any information in the colour, which means we can simply get rid of the colour information and transform the image to greyscale.

Next, we want to maximise the “distance” between the information (the bars) and the noise (the background): this is usually done by thresholding the image. Using a global value for thresholding does not always give good results, especially when different areas of the image are characterised by different illumination. Some more advanced techniques, such as Otsu thresholding method (which I used in my decoder), are a better fit.

Finally, it is possible to have some residual noise, due to the thresholding process, whereby some white pixels are present in black areas and vice-versa. This is called salt’n’pepper noise and can effectively be filtered with median filters, which substitute the value of a pixel with the median of those around. The great advantage is that it preserves the edges of the image.

Step two: feature selection, extraction, and classification

Now we can start thinking about the features defining our symbols. We know we have 4 symbols, which we can call ascenderdescenderlong (Full Height, in the image), and short (Tracker, in the image).

The 4 symbols used in RM4SCC, from Wikipedia The 4 symbols used in RM4SCC, from Wikipedia

The first obvious feature we can select is the vertical position of each bar. After all, that’s the information we need to decode the codeword. However, if we choose the 4 points determining each bar, we’d probably end up complicating the decoding process too much.

An easy way out is to choose the centroid position (just its y-coordinate would be necessary) for each bar. Notice that, though, the long and short bars will share the same feature. If we go along this path, we need another feature to distinguish (at least) the long bar from the short bar. The second obvious feature is therefore the size or, more accurately, the area. This feature will allow us to distinguish easily long from short, but it will be pretty useless for the ascender and the descender.

For the extraction, we need to segment the image and find all the bars, and compute the so-called moments for each of them. The first three moments will be enough for us to get all the features we are interested in.

As a side note, as the segmentation function I have used does not return all segments in order, I had to extract the x-coordinate for each bar so as to be able and sort the vector of symbols.

If the code scanned is reasonably horizontal, we should be able to classify all four symbols pretty easily. For this bit I resorted to K-means clustering, although other classification methods can be used with similar results.

Step three: the actual decoding

If we don’t consider the starting and ending symbols, all symbols inbetween are grouped 4 by 4. For this reason we first need to build a dictionary that maps all valid combinations of 4-symbols group to the correct letter or number.

Finally, a bit of fun when computing the checksum. I translated the algorithm explained here, with the only difference that I wanted to avoid using yet another table to compute the final letter/number so instead I implemented the rule behind it (which boils down to ensuring ‘bit parity’).

Step four: enjoy it!

And possibly fork, improve and re-release :)

Misconfigured Python pretty printers in GDB

I noticed that running an executable in gdb displayed an error – and only the first time you run the program. The error reads:

(gdb) r
Starting program: /home/chris/workinprogress/cpp/book/a.out 
Traceback (most recent call last):
  File "/usr/share/gdb/auto-load/usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.19-gdb.py", line 63, in 
    from libstdcxx.v6.printers import register_libstdcxx_printers
ImportError: No module named 'libstdcxx'
[Inferior 1 (process 7215) exited normally]
(gdb)

The problem is, the libstdcxx directory is not in the path.
The very simple fix is to add the directory to the Python path in gdbinit.

$ cat ~/.gdbinit
python
import sys
sys.path.insert(0, '/usr/share/gcc-4.8/python')
from libstdcxx.v6.printers import register_libstdcxx_printers
register_libstdcxx_printers (None)
end

Asserting that an exception has not been raised

A quick way to test that some method does not raise an exception is to try and raise that exception. Sounds logic – and it is – but I keep forgetting it. So here it is for my future me.

Suppose you have a method that raises an exception if it can’t open a file.

def read_first_line(self, filename):
    try:
        with open(filename, 'r') as f:
            self.first_line = f.readline()
    except IOError:
        print("No such file {0}".format(filename), file=sys.stderr)

Testing that it works correctly when a file does exist is quite simple at this point:

def test_no_exception_is_raised_if_file_exists(self):
    # suppose your instance is in self.sut
    try:
       sut.read_first_line('file_that_exists.txt')
    except IOError:
        self.fail("IOError raised but should not have")

Graceful Harakiri

In the past few days I’ve been trying to overcome a problem we saw on our CI environment: tests being abruptly cut if they hang.

If a build takes too long, our CI tool stops it by sending it a SIGTERM signal. That’s fine in general, but if it’s a test (run by the nosetests driver) that’s taking too long to finish, a SIGTERM would cause it to immediately stop, without leaving any trace on the output where it hanged.

What I coded was a plugin, Graceful Harakiri, that intercepts the SIGTERM signal, converts it to an interrupt and, in the process, prints out the current frame, giving away some information about where the test got stuck.

The code is on GitHub – have a look at the description and use it. Feedback is most welcome.