Hello and welcome to thoughtwisps! This is a personal collection of notes and thoughts on software engineering, machine
learning and the technology industry and community. For my professional website, please see
race-conditions.
Thank you for visiting!
05 Apr 2017
What is going on in this SimPy Monitoring example?
This post is long and rambling (not unlike a midsummer Sunday’s walk around the Surrey Hills) so you may get frutsrated and angry at the author while reading this. The author wishes to apologize to her readers in advance.
SimPy is a simulation library that you can use to easily create event driven simulations in Python. It’s great to play around with if you like simulating queues, traffic systems and monitoring the usage of shared resources ( for example, the number of seats currently occupied on a train or the number of cashiers free at a checkout queue ). In the simplest cases, you can have SimPy print out the usage statistics for your shared SimPy resources to stdout so that you can monitor it, but for longer and more complex situations, it is better to have an automated simulation data collection tool.
Luckily, the SimPy documentation comes in handy and the authors have included a helpful example, which will allow you to patch a SimPy Resource
object and monitor it during the simulation. I have reproduced it, with a few comments removed for the sake of brevity, below. Decorators and functools are a bit of a deep sea for me, so in this blogpost I am going to clarify what is going on in the patch_resource
function and why it can be used to monitor resource usage in SimPy simulations.
Patching resources in SimPy
from functools import partial, wraps
import simpy
def patch_resource(resource, pre=None, post=None):
def get_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
if pre:
pre(resource)
ret = func(*args, **kwargs)
if post:
print 'Calling post function'
post(resource)
return ret
return wrapper
for name in ['put', 'get', 'request', 'release']:
if hasattr(resource, name):
setattr(resource, name, get_wrapper(getattr(resource, name)))
Whoah, whoah there - functools, decorators, some strange @wraps(func) thing! - let’s take it one step at a time!
Functools is a Python library for higher-order functions - functions that take other functions as arguments, manipulate functions and return new functions. In the patch_resource
method, we use two items from the functools module: wraps
and partial
. Let’s take a look at wraps
.
According to the official documentation wraps
is a “convenience function for invoking update_wrapper
“, another method in the functools
module, as a decorator (as the SimPy documentation authors have done in the example above). After reading the documentation for update_wrapper
, I’m not really sure what it does - at this point it might be better to poke the function instead of trying to decipher what the documentation means. The update_wrapper
function takes in a wrapper
function and a wrapped
function as arguments, so I am going to setup a simple simplefunc
that prints out ‘hello, world’ and a wrapper
func, which pretends to do something useful before and after calling simplefunc
.
from functools import update_wrapper
def simplefunc():
print 'hello, world'
def wrapper(func, *args, **kwargs):
print 'Doing some stuff before calling func'
value = func(*args, **kwargs)
print 'Doing some stuff after calling func'
return value
obj = update_wrapper(wrapper, simplefunc)
According to the documentation, the wrapper
function should now look like the wrapped
function. If I understood it correctly, I should now be able to call wrapper
like simplefuncs
.
results in
>>> obj()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: wrapper() takes at least 1 argument (0 given)
so I have clearly missed something. A quick googling lands me on this SO answer and reveals that update_wrapper
has nothing to do with the function signatures. What it does, instead, is carries over important dunder attributes from the original function to the new wrapped function. For example, if we rework the simplefunc
and wrapper
example above to the following:
def simplefunc():
'''print hello world'''
print 'hello world'
def decoratorfunc(f):
def wrapper(*args, **kwargs):
print 'f.__name__', f.__name__
value = f(*args, **kwargs)
return value
return wrapper
newFunc = decoratorfunc(simplefunc)
newFunc()
The output is something like this
>>> newFunc()
f.__name__ simplefunc
f.__doc__ print hello world
hello world
However, if we try to print out the name and doc properties of newFunc
, which is essentially our simplefunc
, just wrapped with some helpful methods, we get the following
>>> newFunc.__name__
'wrapper'
>>> newFunc.__doc__
>>>
The docstring and name of the original simplefunc
have been lost. This is where update_wrapper
or its decorator equivalent can come in handy. Let’s rewrite the example above to user the @wraps
syntax:
from functools import wraps
def decoratorfunc(f):
@wraps(f)
def wrapper(*args, **kwargs):
print 'f.__name__', f.__name__
value= f(*args, **kwargs)
return value
return wrapper
newFunc = decoratorfunc(simplefunc)
newFunc()
print newFunc.__name__
print newFunc.__doc__
We get the following output
>>> newFunc()
f.__name__ simplefunc
hello world
>>> newFunc.__name__
'simplefunc'
>>> newFunc.__doc__
'print hello world'
>>>
As the example shows, update_wrapper
or its decorator equivalent @wraps
preserves the original function’s attributes in the decorated function.
06 Mar 2017
RxPy is a Python programming library that allows us to compose reactive programs. Now, “reactive” is quickly becoming one of those buzzwords that everyone throws around, but no one really knows how to explain, so for the scope of this article, I’ll consider “reactive” programming to be a paradigm of programming where programmers think of streams of data instead of individual data values. Libraries that support reactive programming supply operators that can be applied to data streams to achieve the goal of the program. In this set of notes, I will go through some of the common operators available in RxPy.
Filtering a data stream
Let’s start with simple examples. One of the simplest manipulation one can perform on a stream of data is filtration with a certain criterion.
For example, we may want to filter out all integers above a certain value from our stream of data.
from rx import Observable
#Filter data based on certain criteria
Observable.from_([1,2,3,4,5])\
.filter(lambda s: s>=5)\
.subscribe(lambda s: print(s))
In addition to filtering, we may also want to limit the number of data points that the subscribers of the Observable
see. This can be achieved using the take
operator, which take as an argument the number of
items that should be ‘taken’ from the data stream.
#take 2 items from the data stream
Observable.from_([1,2,3,4,5])\
.take(2)\
.subscribe(lambda s: print(s))
An interesting thing to note is that take
behaves gracefully if there are fewer data items in the stream than specified in the argument to take
.
Observable.from_([1,2,3])\
.take(5)\
.subscribe(lambda s: print(s))
A variation of take
is take_while
which feeds data from the streams to the subscribers until a certain condition is met.
In the example below, we will feed data items to the subscriber as long as they are less than 4.
Observable.from_([1,2,3,4,5,4,3,2])\
.take_while(lambda s: s<4)\
.subscribe(lambda s: print(s))
>>>
1
2
3
Please note that the data items, which fulfill the filter criterion, but which appear after the first number 4 in the stream are not passed to the subscriber.
Reducing data streams
In the previous examples, we were mainly concerned with manipulating an incoming data stream and producing another, filtered data stream as an output. In this section, we’ll take a look at operators that aggregate data streams in some way. For example, we may want to count
the number of items in a certain data stream. In the example below, we count how many cities have an ‘N’.
Observable.from_(['Helsinki', 'London', 'Tokyo'])\
.filter(lambda s: 'N' in s.upper() )\
.count()\
.subscribe(lambda x:print(x))
Another simple, but commonly used operation is to find the sum
of data items in a stream.
print('Find the sum of items in a data stream')
Observable.from_([1,2,3,4,5])\
.sum()\
.subscribe(lambda s: print(s))
One thing to notice about the sum
operator is the fact that the final result will only be returned when all of the items in the data stream have been processed. While this may be ideal in batch processing of data, in more real-time solution we may want to output a rolling sum after we process each incoming data point. In this case, we should express the sum function as lambda x,y: x+y
and use it in the scan operator.
Observable.from_([1,2,3,4,5])\
.scan(lambda subtotal, i: subtotal+i)\
.subscribe(lambda x: print(x))
Merging two or more data streams
In addition to filtering and aggregating, we may want to combine multiple data streams into one before performing additional analytics. We can interleave data points from one or more streams using merge
.
obs1 = Observable.from_([1,2,3])
obs2 = Observable.from_([10,11,12])
Observable.merge(obs1, obs2)\
.subscribe(lambda s:print(s))
04 Feb 2017
The world is full of data that changes or ‘ticks’ over time - prices of financial instruments on the market, quality of air over a given day, the number
of Twitter followers a particular user has (I’m sure you can come up with many more!). How to best capture and analyse this data?
One way of engineering data systems around contantly changing streams of data is to design reactive event-driven systems. That is a mouthful that probably has
as many definitions as there are practitioners, so it might be best to examine what components are needed to construct reactive event-driven systems.
To build the basics of a reactive system we need Observers ( objects that perform some action - ‘react’- when a piece of data they are interested in ticks )
and Observables, which represent the streams of data. In essence, we could characterise this system as a publish-subscribe system: the Observables publish streams of data
to all interested Observer classes.
Let’s make all of this concrete by implementing a simple example using the RxPy library and Python3.
Suppose the air quality of a specific area is measured using an index that can take values from 1 to 10. Let’s design an Observer that subscribes to this stream of
mocked air quality data and emits warnings based on the value of the index. The Observer that we need to write should inherit from the RxPy library’s
Observer class and implement three functions:
-
on_next
which is triggered when an event from an Observable is emitted
-
on_completed
which is called when an Observable has exhausted it’s stream of data (there are no more events)
-
on_error
which is triggered when something goes wrong
from rx import Observer, Observable
from numpy import random
class DataAnalyser(Observer):
#a class for analyzing data
def on_next(self, value):
if value<=3:
print('Safe to enjoy the outdoors!')
else:
print('Air pollution is high in your area - please take care!')
def on_completed(self):
print('Finished analyzing pollution data')
def on_error(self, error):
print('Something went wrong in data analysis')
To complete this example, we also need a an Observable (our stream of mock air quality data ). We can create one very easily using the RxPy Observable class.
Finally, we call the Observable’s subscribe
method to register DataAnalyser
as the objected interested in the stream of data published by the
Observable.
def main():
air_pollution_indices = Observable.from_([random.randint(1,10) for _ in range(20)])
data_analyser = DataAnalyser()
air_pollution_indices.subscribe(data_analyser)
The full sample script is available on Github Gist.
17 Jan 2017
when making a small adjustment becomes an un-testable multi-team problem
Today I’d like to talk about some frustrations that arise when working on a legacy system developed by multiple remote software development teams.
A long time ago, I worked on a system that snapped some realtime ticking data, carried out a few computationally expensive calculations ( they had to be carried out on a remote server machine ) and sent the result to a user’s front end. I was placed in charge of building out the infrastructure for client-server communication. The data manipulation libraries that calculated values based on data ticks were developed by an independent team and I was not given access to modify this code. Although this system had many shortcomings (most introduced by me), a particular pain point was the system of databases and APIs that had grown around the service that supplied ticking data. At the lowest level of the system was a message queue, which monitored the various tick data sources. The data on the queue was pushed into a database, which exposed a direct query API to any clients. However, the query language was more or less intractable to people without deep experience with the database software and thus yet another team was setup to develop an abstraction on top of the database API. None of the APIs were properly documented and had been designed a few years before I came along.
Thus, the correct functioning of my software depended on a 3rd party calculation library, a database managed by a remote team, a hard to use database API managed by the same remote team and a another abstraction built on top of the database API to ‘make things easier’ for the ultimate client applications (such as the system I developed ). At first the components of the system cooperated on a fairly regular basis and eventually thousands of different datatypes were onboarded to the same system. Eventually the vendor software that governed the values being placed into the message queue was replaced with another software that did not have the same filtration capabilities. Very soon I started seeing invalid data values propagating into my server side calculation process and the 3rd party calculation library struggled to cope with them. It did not have proper error handling to exclude bad data - since the need had never come up, no one had ever thought of developing it. This library was mission critical and used by hundreds of applications. Adding any kinds of changes would have required a testing process that would span months and potentially tens of different teams. What made matters worse, I did not have any control of the data values directly since the abstraction API on top of the database communicated directly with the 3rd party calculation library without returning any results into my server process.
My next step then was to talk to the abstraction API team, but the solutions that were offered for filtering offending data could not be implemented, because such a change would have to be carried out for all datatypes and not just the one that was causing issues for me. The API had not been designed to provide granularity based on particular data types.
In addition, it was hard to convince the abstraction API team that my problem was a legitimate problem. The data value ticking on the message bus was a legitimate value, but in the context of the business it made very little sense, which is why the computational 3rd party API would have never expected it.
Eventually, the solution had to be moved into the lowest level - the database API - only after multiple discussions with multiple teams.
I think there are a few important lessons and some questions:
1) Segregating software developers into highly specialised teams produces software quickly, but the APIs delivered by such teams can easily ignore the needs of developers working on client libraries.
2) This is a hard one: but software should be designed so that it can easile be extended at a future time when requirements change. This fact alone makes comparisons between civil engineering and software engineering hard. I’d imagine that once a team of engineers decides to build a pedestrian bridge, they build a pedestrian bridge. No one will come along and say, “Hey, now your pedestrian bridge will also have to accomodate large trucks.” This happens very often in software engineering when one tries to scale an application - the infrastructure that was able to support 100 users, simply won’t be able to cope with 1 000 000 users.
3) What are the best practices for designing data delivery layers? What features should be a part of APIs that expose realtime data to application developers?
15 Jan 2017
It’s the second week of the new year ( 2017, can you believe it? I know I’ll still be writing 2016 on various paperwork for a while until muscle memory adjusts to 2017 ) and it’s the time when my new year’s resolutions are starting to crumble down. But fear not - one learns from failure more than from (almost) anything else!
Software engineering
I’m doing a deep dive into Python’s super. I realised last year that peppering the codebase with mixins and then calling super ‘like I super-certainly know what it does’ was not a good idea. First, in large codebases and huge distributed teams, people tend to copy a working example without actually making sure the original author had a clue. Second, I would really like to get to grips with the background of the MRO algorithm (and maybe check out other competing implementations).
In other news, I finally have jekyll build
working on my laptop, so I can check out the blog before pushing it online!
Running
This was not a great week for running, since I mostly gave in to the various excuses a tired post-work me came up with, but on Saturday I finally ventured out to explore Regent’s Canal. Although the tow path is slightly harder than the trail-like tow path of the Grand Union Canal, it was still a pleasant run and a lot less muddy than my last weekend’s excursion to Wimbledon Common. The weather was stellar as well and I saw a few people with a house boat using the canal locks!
Goals for next week
- Continue to reclaim my attention. Disconnect from pointless time-wasting social media sites and focus on focusing.
- Sleep enough
- Write a simple talk on socket programming in Python
- Read through Raymond Hettinger’s blog posts on super