race_conditions Coding and running

RxPy Operators with Examples

RxPy is a Python programming library that allows us to compose reactive programs. Now, "reactive" is quickly becoming one of those buzzwords that everyone throws around, but no one really knows how to explain, so for the scope of this article, I'll consider "reactive" programming to be a paradigm of programming where programmers think of streams of data instead of individual data values. Libraries that support reactive programming supply operators that can be applied to data streams to achieve the goal of the program. In this set of notes, I will go through some of the common operators available in RxPy.

Filtering a data stream

Let's start with simple examples. One of the simplest manipulation one can perform on a stream of data is filtration with a certain criterion. For example, we may want to filter out all integers above a certain value from our stream of data.

from rx import Observable 

#Filter data based on certain criteria
          .filter(lambda s: s>=5)\
          .subscribe(lambda s: print(s))

In addition to filtering, we may also want to limit the number of data points that the subscribers of the Observable see. This can be achieved using the take operator, which take as an argument the number of items that should be 'taken' from the data stream.

#take 2 items from the data stream
           .subscribe(lambda s: print(s))

An interesting thing to note is that take behaves gracefully if there are fewer data items in the stream than specified in the argument to take.

           .subscribe(lambda s: print(s))

A variation of take is take_while which feeds data from the streams to the subscribers until a certain condition is met. In the example below, we will feed data items to the subscriber as long as they are less than 4.

          .take_while(lambda s: s<4)\
          .subscribe(lambda s: print(s))


Please note that the data items, which fulfill the filter criterion, but which appear after the first number 4 in the stream are not passed to the subscriber.

Reducing data streams

In the previous examples, we were mainly concerned with manipulating an incoming data stream and producing another, filtered data stream as an output. In this section, we'll take a look at operators that aggregate data streams in some way. For example, we may want to count the number of items in a certain data stream. In the example below, we count how many cities have an 'N'.

Observable.from_(['Helsinki', 'London', 'Tokyo'])\
          .filter(lambda s: 'N' in s.upper() )\
          .subscribe(lambda x:print(x))

Another simple, but commonly used operation is to find the sum of data items in a stream.

print('Find the sum of items in a data stream')
           .subscribe(lambda s: print(s)) 

One thing to notice about the sum operator is the fact that the final result will only be returned when all of the items in the data stream have been processed. While this may be ideal in batch processing of data, in more real-time solution we may want to output a rolling sum after we process each incoming data point. In this case, we should express the sum function as lambda x,y: x+y and use it in the scan operator.

          .scan(lambda subtotal, i: subtotal+i)\
          .subscribe(lambda x: print(x))

Merging two or more data streams

In addition to filtering and aggregating, we may want to combine multiple data streams into one before performing additional analytics. We can interleave data points from one or more streams using merge.

obs1 = Observable.from_([1,2,3])
obs2 = Observable.from_([10,11,12])

Observable.merge(obs1, obs2)\
           .subscribe(lambda s:print(s))

Data science with Python and RxPy - First steps and notes

The world is full of data that changes or 'ticks' over time - prices of financial instruments on the market, quality of air over a given day, the number of Twitter followers a particular user has (I'm sure you can come up with many more!). How to best capture and analyse this data? One way of engineering data systems around contantly changing streams of data is to design reactive event-driven systems. That is a mouthful that probably has as many definitions as there are practitioners, so it might be best to examine what components are needed to construct reactive event-driven systems.

To build the basics of a reactive system we need Observers ( objects that perform some action - 'react'- when a piece of data they are interested in ticks ) and Observables, which represent the streams of data. In essence, we could characterise this system as a publish-subscribe system: the Observables publish streams of data to all interested Observer classes.

Let's make all of this concrete by implementing a simple example using the RxPy library and Python3. Suppose the air quality of a specific area is measured using an index that can take values from 1 to 10. Let's design an Observer that subscribes to this stream of mocked air quality data and emits warnings based on the value of the index. The Observer that we need to write should inherit from the RxPy library's Observer class and implement three functions:

  • on_next which is triggered when an event from an Observable is emitted

  • on_completed which is called when an Observable has exhausted it's stream of data (there are no more events)

  • on_error which is triggered when something goes wrong

from rx import Observer, Observable
from numpy import random

class DataAnalyser(Observer):
    #a class for analyzing data
    def on_next(self, value):
        if value<=3:
            print('Safe to enjoy the outdoors!')
            print('Air pollution is high in your area - please take care!')
    def on_completed(self):
        print('Finished analyzing pollution data')
    def on_error(self, error):
        print('Something went wrong in data analysis')

To complete this example, we also need a an Observable (our stream of mock air quality data ). We can create one very easily using the RxPy Observable class. Finally, we call the Observable's subscribe method to register DataAnalyser as the objected interested in the stream of data published by the Observable.

def main():
    air_pollution_indices = Observable.from_([random.randint(1,10) for _ in range(20)])
    data_analyser = DataAnalyser()

The full sample script is available on Github Gist.

Legacy software and disadvantages of highly specialised teams

when making a small adjustment becomes an un-testable multi-team problem

Today I'd like to talk about some frustrations that arise when working on a legacy system developed by multiple remote software development teams.

A long time ago, I worked on a system that snapped some realtime ticking data, carried out a few computationally expensive calculations ( they had to be carried out on a remote server machine ) and sent the result to a user's front end. I was placed in charge of building out the infrastructure for client-server communication. The data manipulation libraries that calculated values based on data ticks were developed by an independent team and I was not given access to modify this code. Although this system had many shortcomings (most introduced by me), a particular pain point was the system of databases and APIs that had grown around the service that supplied ticking data. At the lowest level of the system was a message queue, which monitored the various tick data sources. The data on the queue was pushed into a database, which exposed a direct query API to any clients. However, the query language was more or less intractable to people without deep experience with the database software and thus yet another team was setup to develop an abstraction on top of the database API. None of the APIs were properly documented and had been designed a few years before I came along.

Thus, the correct functioning of my software depended on a 3rd party calculation library, a database managed by a remote team, a hard to use database API managed by the same remote team and a another abstraction built on top of the database API to 'make things easier' for the ultimate client applications (such as the system I developed ). At first the components of the system cooperated on a fairly regular basis and eventually thousands of different datatypes were onboarded to the same system. Eventually the vendor software that governed the values being placed into the message queue was replaced with another software that did not have the same filtration capabilities. Very soon I started seeing invalid data values propagating into my server side calculation process and the 3rd party calculation library struggled to cope with them. It did not have proper error handling to exclude bad data - since the need had never come up, no one had ever thought of developing it. This library was mission critical and used by hundreds of applications. Adding any kinds of changes would have required a testing process that would span months and potentially tens of different teams. What made matters worse, I did not have any control of the data values directly since the abstraction API on top of the database communicated directly with the 3rd party calculation library without returning any results into my server process.

My next step then was to talk to the abstraction API team, but the solutions that were offered for filtering offending data could not be implemented, because such a change would have to be carried out for all datatypes and not just the one that was causing issues for me. The API had not been designed to provide granularity based on particular data types. In addition, it was hard to convince the abstraction API team that my problem was a legitimate problem. The data value ticking on the message bus was a legitimate value, but in the context of the business it made very little sense, which is why the computational 3rd party API would have never expected it.

Eventually, the solution had to be moved into the lowest level - the database API - only after multiple discussions with multiple teams.

I think there are a few important lessons and some questions:

1) Segregating software developers into highly specialised teams produces software quickly, but the APIs delivered by such teams can easily ignore the needs of developers working on client libraries.

2) This is a hard one: but software should be designed so that it can easile be extended at a future time when requirements change. This fact alone makes comparisons between civil engineering and software engineering hard. I'd imagine that once a team of engineers decides to build a pedestrian bridge, they build a pedestrian bridge. No one will come along and say, "Hey, now your pedestrian bridge will also have to accomodate large trucks." This happens very often in software engineering when one tries to scale an application - the infrastructure that was able to support 100 users, simply won't be able to cope with 1 000 000 users.

3) What are the best practices for designing data delivery layers? What features should be a part of APIs that expose realtime data to application developers?

this week ( week 2)

It's the second week of the new year ( 2017, can you believe it? I know I'll still be writing 2016 on various paperwork for a while until muscle memory adjusts to 2017 ) and it's the time when my new year's resolutions are starting to crumble down. But fear not - one learns from failure more than from (almost) anything else!

Software engineering

I'm doing a deep dive into Python's super. I realised last year that peppering the codebase with mixins and then calling super 'like I super-certainly know what it does' was not a good idea. First, in large codebases and huge distributed teams, people tend to copy a working example without actually making sure the original author had a clue. Second, I would really like to get to grips with the background of the MRO algorithm (and maybe check out other competing implementations).

In other news, I finally have jekyll build working on my laptop, so I can check out the blog before pushing it online!


This was not a great week for running, since I mostly gave in to the various excuses a tired post-work me came up with, but on Saturday I finally ventured out to explore Regent's Canal. Although the tow path is slightly harder than the trail-like tow path of the Grand Union Canal, it was still a pleasant run and a lot less muddy than my last weekend's excursion to Wimbledon Common. The weather was stellar as well and I saw a few people with a house boat using the canal locks!

Goals for next week

  1. Continue to reclaim my attention. Disconnect from pointless time-wasting social media sites and focus on focusing.
  2. Sleep enough
  3. Write a simple talk on socket programming in Python
  4. Read through Raymond Hettinger's blog posts on super

Code Mesh Day 2

Notes on Day 2 of the Code Mesh 2016 conference in London.

Stateful stream processing

With the main keynote event moved to the afternoon, Day 2 of Code Mesh 2016 launched directly into the sessions. In the morning, I attended Streaming, Database & Distributed Systems: Bridging the Divide by Ben Stopford from Confluent. It seems that distributed systems is the hot computer science topic du jour and I, not wanting to be left out of the cool kids crowd, headed over to listen and learn from Ben's expertise in streaming and distributed systems. Ben outlined two goals for the talk: understand stateful stream processing(SSP) and argue that SSP can be a 'general framework for building data-centric systems'. There are several flavours of data-analytics systems

  • database (eg Postgres - provides a consistent view of the data )
  • analytics database (eg Hadoop, Spark - specialises in aggregations performed over large datasets )
  • messaging system ( has ephemeral state )
  • stream processing ( manipulate concurrent streams of events and perform computations on the streamed data)
  • stateful stream procesing ( a branch of stream processing )

If a database's query engine traverses a finite data table, the stream processor's query engine is designed to operate over an infinite dataset that, however, has to be bounded by a 'window'. The 'window' delineates how many 'ticks' of data are allowed into the stream processor's query engine. The query engine then executes a continous query and emits data at some frequency. This behaviour is somewhat analogous to a materialised view in a database. The DB takes two tables, a query from the user ( some aggregation or grouping of the data ) and manifests the result as another table on disk. Materialised views are useful when performance is key ( everytime a user does a query, the computation does not have to be repeated ). The materialised view table is recalculated every time data in either of the tables changes. How does this apply to stateful stream processing? In essence, stateful stream processing is about creating materialised views, which manifest as another table or another stream. SSPs typically use Kafka, a distributed log, to achieve statefulness ( I am not entirely sure I understand what exactly Kafka does - I will be doing another post later to clarify some details ).

New trends in web dev and the effect of machine learning on software engineering

After Ben's talk, it was time to dive into some emerging technologies for web development. Laure Phillips spoke on 'How web programming is more than a server and some clients?' She spoke about the traditional approach of segregating a rich internet application into different tiers (ie backend, frontend, database layer ) and programming each layer with a bespoke language. Then she gave some insights into her own research: developing tierless programming frameworks for internet applications.

After Laure's talk on web dev, it was time to examine how the pervasive presence of machine learning in most modern data systems is affecting the field of software engineering. Twitter's Kovas Boguta spoke about "Machine Learning Models: A New Kind of Software Artifact". Kovas' talk highlighted the challenges that traditional software engineering tools are facing when confronted with non-deterministic models (how do you write tests for a machine learning model? how do you test it sufficiently? ).

Flexible Paxos

This talk by Cambridge PhD student Heidi Howard was on my most-anticipated list - primarily, because it provides a new angle on the work done by Leslie Lamport. Unfortunately, lots of procrastination on the night before the conference meant that I had failed to read Lamport's original Paxos paper and thus was more or less seriously lost during the talk. I plan to publish another blog post soon which will give an in-depth overview of Lamport's original Paxos paper and Heidi Howard's Flexible Paxos modification.