Gathering Time Series Data

A brief description of an efficient solution to gather new time series data

10.23.2015

Introduction

I have been working on an application that gathers time series data from sensors, located at sites throughout the valley. The application is one piece of a larger system of software (ODM2). Since the data is in almost real time, it’s valuable for many reasons. For instance, it can be used to create forecasting and flood risk models. The data is also used to determine things like, “The water just increased by a foot — here comes a flood wave,” or, “Looks like we had more rainfall one year ago than we do today.”

Here is the context for this particular situation:

  • A text file on a web server is updated periodically with streaming time series data.
  • An application periodically connects to the web server to read the file.
  • The time series data is mapped to tables in a relational database for storage and future analysis.

Challenges

The main problem is that the speed of the application is negatively affected as the data file grows larger. It is inefficient to read the entire data file each time, because new data is constantly appended to it. Optimizing the speed of the application is one of the problems to solve.

Solution

One way to dramatically improve the speed of the application is to keep a record of the last known byte that was read from the data file. Each time the application reads the data file, it will first check inside of a configuration file to see what the last byte read was. If the file is opened in binary mode, it’s easy to skip, or seek to that byte number and begin reading from there. After a successful file read, the last byte is logged in the configuration file.

This method works great for local files that exist on the file system. Before this same method will work for remote files, there are a couple extra steps to take. First, check what the path looks like. If the path begins with http:// then it is obviously a web address. Next, simply read the entire data file. Python has a great module that makes this an easy two-line process. Store the data in a temporary file on the file system. Tempfile works great for this. Reading/writing the entire data file may seem like a bottleneck, but it ends up not making much of a difference. The temporary file is deleted as soon as the application is done using it.

With the data on the web now temporarily stored in a local file, the same method mentioned above can now be applied. By reading only the new data in a file, the application’s efficiency is now only closely related to how much new data has been uploaded since the last read.

Algorithm and Python Implementation

Here is a sample Python function that will read a file — either local, or remote — and return the newest data.

Note: This function could be refactored and divided into several smaller functions that each perform one specific part of the process.

import pandas as pd
import logging
from StringIO import StringIO
import tempfile
import os
import urllib2

def byteReader(filepath, start_byte, datecol, header=0, sep=None, dataBegin=0):
    try:
        if filepath.startswith('http://'):
            response = urllib2.urlopen(filepath)
            data = response.read()
            temp = tempfile.NamedTemporaryFile()
            try:
                temp.write(data)
                temp.seek(0)
            finally:
                df = byteReader(temp.name, start_byte, datecol, header, sep, dataBegin)
                temp.close()
                return df
    except AttributeError:
        logger.error("Could not read data file!")
    df = pd.DataFrame
    # Check if the data has been modified.
    if int(os.path.getsize(filepath)) < start_byte:
        logger.info('Previous data has been modified.')
        start_byte = 0
    try:
        with open(filepath, 'rb') as f:
            logger.info('Reading from byte %d.' % start_byte)
            # If we are going to skip to the new location, we need
            # to make sure and grab the headers for Pandas.
            if start_byte > 0:
                header_names = ''
                for i in range(header):
                    header_names = f.next()
                f.seek(int(start_byte))
                new_data = f.read()
                finished_data = header_names + new_data
                if new_data:
                    logger.info('New Data.')
                    logger.debug(finished_data)
                else:
                    logger.info('No new data.')
                df = pd.read_csv(StringIO(finished_data),
                                    sep=str(sep),
                                    engine='python')
                df.set_index(datecol, inplace=True)
            else:
                # Just begin at the start of the file.
                f.seek(0)
                finished_data = f.read()
                logger.info('New data.')
                logger.debug(finished_data)
                df = pd.read_csv(StringIO(finished_data),
                                    header=(header - 1),
                                    sep=str(sep),
                                    engine='python')
                df = df.ix[(dataBegin - header) - 1:]
                df.set_index(datecol, inplace=True)
    except IOError as e:
        logger.error('Skipping "%s" because of %s' % (filepath, e))
    except Exception as e2:
        logger.error('Exception: %s' % e2.message)
    return df