race_conditions Coding and running

Exploring the Perceptron

I used Gaston Sanchez' very helpful "Mathjax with Jekyll" post to write up the mathematics in this post. If you are writing a mathematics-heavy post, you may also want to look at the "Jekyll Extras" documentation and at Fong Chun Chan's "R Markdown to Jekyll: 'Protecting Your Math Equations'"

As another side note: this is by no means amazing production-ready code, most of this is just exploring and rambling, so please take it as such! Now on to the perceptron!

The Rosenblatt Perceptron was one of the first algorithms to allow computer programs to learn from data and use the learned "knowledge" to classify previously unseen examples. It was based on an early model of the neuron - the McCullock-Pitts(MCP) Neuron. The MCP neuron receives input data at its dendrites, combines this input data into a value ( we will come back to this part later ) and then fires a signal if a certain threshold has been exceeded.

A similar idea is echoed in the Rosenblatt Perceptron. The algorithm receives a set of input data \(\mathbf{x}\). We then feed a linear combination, \(z\), of the input data \(\mathbf{x}\) and some weight vectors \(\mathbf{w}\) into an activation function \(\phi(z)\). If \(\phi(z)>\theta\), the neuron 'fires' and we classify the training data into class 1. If the activation function does not exceed the threshold \(\theta\), we classify the input example as -1. These class labels do not necessarily correspond to any 'real world' concepts of 1 and -1, but will prove useful later, when we examine how the perceptron algorithm learns from data.

One of the obvious questions arising from this is 'how do we know the appropriate values to choose for \(\theta\) and \(\mathbf{w}\)?'.

Provided we can supply examples of input data \(\mathbf{x}\) labelled with the true class label, we can train the perceptron algorithm to learn the values of \(\mathbf{w}\) and \(\theta\). In fact, we can rewrite \(\mathbf{w}\) to include \(\theta\). Let's take a look at an example where the input data is two-dimensional \(\mathbf{x}=(x_{1}, x_{2})\) and the weight vector is \(\mathbf{w}=( w_{1}, w_{2} ) \). We can then write the activation function \(\phi(w_1x_1+w_2x_2)=\phi(\mathbf{w}^{T}\mathbf{x})=\phi(z)>\theta\). We can also move the \(\theta\) to the other side of the equation to get \(\phi(z)-\theta=w_1x_1+w_2x_2-\theta\geq 0 \). In fact, if we rewrite \(\mathbf{x}=(x_1,x_2, 1)\) and \(\mathbf{w}=(w_1,w_2,-\theta)\), and define \(w_0=-\theta\) and \(x_0=1\) we can express \(z\) as \(\mathbf{w}^{T}\mathbf{x}\).

Now that we have examined notation involved in the perceptron, let's take a look at the perceptron algorithm.

Perceptron Algorithm

  1. Initialise the values of \(\mathbf{w}\) to \((0,0,0)\).
  2. For each training data example \(\mathbf{x^i}\), we compute
    • \(\hat{y}\) the precited class of the example
    • we update each entry of the weight vector \(\mathbf{w}\), using the formula \[ w_j := w_j + \delta w_j \]
    • \(\delta w_j\) can be computed as follows: \[ \delta w_j = \eta (y^i-\hat{y})x_j^i, \] where \(\eta\) is the learning rate and is a floating number between 0.0 and 1.0. We'll examine how this works in a later post.
  3. Continue iterating over the training data until all examples are classified correctly.

A (super quick and dirty, dont-use-this-in-prod) example implementation of the algorithm (in Python ) and a small training example is given below:

def fit(data, eta, max=10):
    """
    data : list of tuples in the format (x0, x1, c)
    eta: floating number between 0.0 and 1.0 
    """

    w = [0,0,0]
    misclassified = 0
    for point in data:
        cp = w[0]*1+w[1]*point[0]+w[2]*point[1]
        if cp!=point[2]:
            misclassified+=1
        w[0] = w[0]+eta*(point[2]-cp)*1
        w[1] = w[1]+eta*(point[2]-cp)*point[0]
        w[2] = w[2]+eta*(point[2]-cp)*point[1]
    epochs=0
    while epochs<=max and misclassified>0:
        misclassified=0
        for point in data:
            cp = w[0]*1+w[1]*point[0]+w[2]*point[1]
            predicted_class = 1 if cp>0 else -1
            if predicted_class!=point[2]:
                misclassified+=1
            w[0] = w[0]+eta*(point[2]-cp)*1
            w[1] = w[1]+eta*(point[2]-cp)*point[0]
            w[2] = w[2]+eta*(point[2]-cp)*point[1]
        print w
        epochs+=1
    print 'Finished'
    print w

def main():
    data=[(0.5, 0.5, 1), (-0.5, -0.5, -1)]
    eta = 0.5
    fit(data, eta)

if __name__=='__main__':
    main()

In the next post, we will refactor this implementation a la OOP ( the old Java is hard to lose ) and write some tests to test for regressions.

References

Python: Deeper Insights into Machine Learning by John Hearty, David Julian and Sebastian Raschka

What is going on in this SimPy Monitoring example?

What is going on in this SimPy Monitoring example?

This post is long and rambling (not unlike a midsummer Sunday's walk around the Surrey Hills) so you may get frutsrated and angry at the author while reading this. The author wishes to apologize to her readers in advance.

SimPy is a simulation library that you can use to easily create event driven simulations in Python. It's great to play around with if you like simulating queues, traffic systems and monitoring the usage of shared resources ( for example, the number of seats currently occupied on a train or the number of cashiers free at a checkout queue ). In the simplest cases, you can have SimPy print out the usage statistics for your shared SimPy resources to stdout so that you can monitor it, but for longer and more complex situations, it is better to have an automated simulation data collection tool.

Luckily, the SimPy documentation comes in handy and the authors have included a helpful example, which will allow you to patch a SimPy Resource object and monitor it during the simulation. I have reproduced it, with a few comments removed for the sake of brevity, below. Decorators and functools are a bit of a deep sea for me, so in this blogpost I am going to clarify what is going on in the patch_resource function and why it can be used to monitor resource usage in SimPy simulations.

Patching resources in SimPy

from functools import partial, wraps
import simpy

def patch_resource(resource, pre=None, post=None):
    def get_wrapper(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if pre:
                pre(resource) 
            ret = func(*args, **kwargs)
            if post:
                print 'Calling post function'
                post(resource)
            return ret
        return wrapper
    for name in ['put', 'get', 'request', 'release']:
        if hasattr(resource, name):
            setattr(resource, name, get_wrapper(getattr(resource, name)))

Whoah, whoah there - functools, decorators, some strange @wraps(func) thing! - let's take it one step at a time!

Functools

Functools is a Python library for higher-order functions - functions that take other functions as arguments, manipulate functions and return new functions. In the patch_resource method, we use two items from the functools module: wraps and partial. Let's take a look at wraps.

According to the official documentation wraps is a "convenience function for invoking update_wrapper ", another method in the functools module, as a decorator (as the SimPy documentation authors have done in the example above). After reading the documentation for update_wrapper, I'm not really sure what it does - at this point it might be better to poke the function instead of trying to decipher what the documentation means. The update_wrapper function takes in a wrapper function and a wrapped function as arguments, so I am going to setup a simple simplefunc that prints out 'hello, world' and a wrapper func, which pretends to do something useful before and after calling simplefunc.

from functools import update_wrapper

def simplefunc():
    print 'hello, world'

def wrapper(func, *args, **kwargs):
    print 'Doing some stuff before calling func'
    value = func(*args, **kwargs)
    print 'Doing some stuff after calling func'
    return value

obj = update_wrapper(wrapper, simplefunc)

According to the documentation, the wrapper function should now look like the wrapped function. If I understood it correctly, I should now be able to call wrapper like simplefuncs.

obj()

results in

>>> obj()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: wrapper() takes at least 1 argument (0 given)

so I have clearly missed something. A quick googling lands me on this SO answer and reveals that update_wrapper has nothing to do with the function signatures. What it does, instead, is carries over important dunder attributes from the original function to the new wrapped function. For example, if we rework the simplefunc and wrapper example above to the following:


def simplefunc():
    '''print hello world'''
    print 'hello world'

def decoratorfunc(f):
    def wrapper(*args, **kwargs):
        print 'f.__name__', f.__name__
        value = f(*args, **kwargs)
        return value
    return wrapper

newFunc = decoratorfunc(simplefunc)
newFunc()

The output is something like this

>>> newFunc()
f.__name__ simplefunc
f.__doc__ print hello world
hello world

However, if we try to print out the name and doc properties of newFunc, which is essentially our simplefunc, just wrapped with some helpful methods, we get the following

>>> newFunc.__name__
'wrapper'
>>> newFunc.__doc__
>>> 

The docstring and name of the original simplefunc have been lost. This is where update_wrapper or its decorator equivalent can come in handy. Let's rewrite the example above to user the @wraps syntax:

from functools import wraps

def decoratorfunc(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        print 'f.__name__', f.__name__
        value= f(*args, **kwargs)
        return value
    return wrapper

newFunc = decoratorfunc(simplefunc)
newFunc()
print newFunc.__name__
print newFunc.__doc__

We get the following output

>>> newFunc()
f.__name__ simplefunc
hello world
>>> newFunc.__name__
'simplefunc'
>>> newFunc.__doc__
'print hello world'
>>> 

As the example shows, update_wrapperor its decorator equivalent @wraps preserves the original function's attributes in the decorated function.

RxPy Operators with Examples

RxPy is a Python programming library that allows us to compose reactive programs. Now, "reactive" is quickly becoming one of those buzzwords that everyone throws around, but no one really knows how to explain, so for the scope of this article, I'll consider "reactive" programming to be a paradigm of programming where programmers think of streams of data instead of individual data values. Libraries that support reactive programming supply operators that can be applied to data streams to achieve the goal of the program. In this set of notes, I will go through some of the common operators available in RxPy.

Filtering a data stream

Let's start with simple examples. One of the simplest manipulation one can perform on a stream of data is filtration with a certain criterion. For example, we may want to filter out all integers above a certain value from our stream of data.

from rx import Observable 

#Filter data based on certain criteria
Observable.from_([1,2,3,4,5])\
          .filter(lambda s: s>=5)\
          .subscribe(lambda s: print(s))

In addition to filtering, we may also want to limit the number of data points that the subscribers of the Observable see. This can be achieved using the take operator, which take as an argument the number of items that should be 'taken' from the data stream.

#take 2 items from the data stream
Observable.from_([1,2,3,4,5])\
           .take(2)\
           .subscribe(lambda s: print(s))

An interesting thing to note is that take behaves gracefully if there are fewer data items in the stream than specified in the argument to take.

Observable.from_([1,2,3])\
           .take(5)\
           .subscribe(lambda s: print(s))

A variation of take is take_while which feeds data from the streams to the subscribers until a certain condition is met. In the example below, we will feed data items to the subscriber as long as they are less than 4.

Observable.from_([1,2,3,4,5,4,3,2])\
          .take_while(lambda s: s<4)\
          .subscribe(lambda s: print(s))

>>>
1
2
3

Please note that the data items, which fulfill the filter criterion, but which appear after the first number 4 in the stream are not passed to the subscriber.

Reducing data streams

In the previous examples, we were mainly concerned with manipulating an incoming data stream and producing another, filtered data stream as an output. In this section, we'll take a look at operators that aggregate data streams in some way. For example, we may want to count the number of items in a certain data stream. In the example below, we count how many cities have an 'N'.

Observable.from_(['Helsinki', 'London', 'Tokyo'])\
          .filter(lambda s: 'N' in s.upper() )\
          .count()\
          .subscribe(lambda x:print(x))

Another simple, but commonly used operation is to find the sum of data items in a stream.

print('Find the sum of items in a data stream')
Observable.from_([1,2,3,4,5])\
           .sum()\
           .subscribe(lambda s: print(s)) 

One thing to notice about the sum operator is the fact that the final result will only be returned when all of the items in the data stream have been processed. While this may be ideal in batch processing of data, in more real-time solution we may want to output a rolling sum after we process each incoming data point. In this case, we should express the sum function as lambda x,y: x+y and use it in the scan operator.

Observable.from_([1,2,3,4,5])\
          .scan(lambda subtotal, i: subtotal+i)\
          .subscribe(lambda x: print(x))

Merging two or more data streams

In addition to filtering and aggregating, we may want to combine multiple data streams into one before performing additional analytics. We can interleave data points from one or more streams using merge.

obs1 = Observable.from_([1,2,3])
obs2 = Observable.from_([10,11,12])

Observable.merge(obs1, obs2)\
           .subscribe(lambda s:print(s))

Data science with Python and RxPy - First steps and notes

The world is full of data that changes or 'ticks' over time - prices of financial instruments on the market, quality of air over a given day, the number of Twitter followers a particular user has (I'm sure you can come up with many more!). How to best capture and analyse this data? One way of engineering data systems around contantly changing streams of data is to design reactive event-driven systems. That is a mouthful that probably has as many definitions as there are practitioners, so it might be best to examine what components are needed to construct reactive event-driven systems.

To build the basics of a reactive system we need Observers ( objects that perform some action - 'react'- when a piece of data they are interested in ticks ) and Observables, which represent the streams of data. In essence, we could characterise this system as a publish-subscribe system: the Observables publish streams of data to all interested Observer classes.

Let's make all of this concrete by implementing a simple example using the RxPy library and Python3. Suppose the air quality of a specific area is measured using an index that can take values from 1 to 10. Let's design an Observer that subscribes to this stream of mocked air quality data and emits warnings based on the value of the index. The Observer that we need to write should inherit from the RxPy library's Observer class and implement three functions:

  • on_next which is triggered when an event from an Observable is emitted

  • on_completed which is called when an Observable has exhausted it's stream of data (there are no more events)

  • on_error which is triggered when something goes wrong

from rx import Observer, Observable
from numpy import random

class DataAnalyser(Observer):
    #a class for analyzing data
    def on_next(self, value):
        if value<=3:
            print('Safe to enjoy the outdoors!')
        else:
            print('Air pollution is high in your area - please take care!')
    def on_completed(self):
        print('Finished analyzing pollution data')
    def on_error(self, error):
        print('Something went wrong in data analysis')

To complete this example, we also need a an Observable (our stream of mock air quality data ). We can create one very easily using the RxPy Observable class. Finally, we call the Observable's subscribe method to register DataAnalyser as the objected interested in the stream of data published by the Observable.

def main():
    air_pollution_indices = Observable.from_([random.randint(1,10) for _ in range(20)])
    data_analyser = DataAnalyser()
    air_pollution_indices.subscribe(data_analyser)

The full sample script is available on Github Gist.

Legacy software and disadvantages of highly specialised teams

when making a small adjustment becomes an un-testable multi-team problem

Today I'd like to talk about some frustrations that arise when working on a legacy system developed by multiple remote software development teams.

A long time ago, I worked on a system that snapped some realtime ticking data, carried out a few computationally expensive calculations ( they had to be carried out on a remote server machine ) and sent the result to a user's front end. I was placed in charge of building out the infrastructure for client-server communication. The data manipulation libraries that calculated values based on data ticks were developed by an independent team and I was not given access to modify this code. Although this system had many shortcomings (most introduced by me), a particular pain point was the system of databases and APIs that had grown around the service that supplied ticking data. At the lowest level of the system was a message queue, which monitored the various tick data sources. The data on the queue was pushed into a database, which exposed a direct query API to any clients. However, the query language was more or less intractable to people without deep experience with the database software and thus yet another team was setup to develop an abstraction on top of the database API. None of the APIs were properly documented and had been designed a few years before I came along.

Thus, the correct functioning of my software depended on a 3rd party calculation library, a database managed by a remote team, a hard to use database API managed by the same remote team and a another abstraction built on top of the database API to 'make things easier' for the ultimate client applications (such as the system I developed ). At first the components of the system cooperated on a fairly regular basis and eventually thousands of different datatypes were onboarded to the same system. Eventually the vendor software that governed the values being placed into the message queue was replaced with another software that did not have the same filtration capabilities. Very soon I started seeing invalid data values propagating into my server side calculation process and the 3rd party calculation library struggled to cope with them. It did not have proper error handling to exclude bad data - since the need had never come up, no one had ever thought of developing it. This library was mission critical and used by hundreds of applications. Adding any kinds of changes would have required a testing process that would span months and potentially tens of different teams. What made matters worse, I did not have any control of the data values directly since the abstraction API on top of the database communicated directly with the 3rd party calculation library without returning any results into my server process.

My next step then was to talk to the abstraction API team, but the solutions that were offered for filtering offending data could not be implemented, because such a change would have to be carried out for all datatypes and not just the one that was causing issues for me. The API had not been designed to provide granularity based on particular data types. In addition, it was hard to convince the abstraction API team that my problem was a legitimate problem. The data value ticking on the message bus was a legitimate value, but in the context of the business it made very little sense, which is why the computational 3rd party API would have never expected it.

Eventually, the solution had to be moved into the lowest level - the database API - only after multiple discussions with multiple teams.

I think there are a few important lessons and some questions:

1) Segregating software developers into highly specialised teams produces software quickly, but the APIs delivered by such teams can easily ignore the needs of developers working on client libraries.

2) This is a hard one: but software should be designed so that it can easile be extended at a future time when requirements change. This fact alone makes comparisons between civil engineering and software engineering hard. I'd imagine that once a team of engineers decides to build a pedestrian bridge, they build a pedestrian bridge. No one will come along and say, "Hey, now your pedestrian bridge will also have to accomodate large trucks." This happens very often in software engineering when one tries to scale an application - the infrastructure that was able to support 100 users, simply won't be able to cope with 1 000 000 users.

3) What are the best practices for designing data delivery layers? What features should be a part of APIs that expose realtime data to application developers?