Live Log Analyzer

by Eliot


This presentation is an HTML5 website (copied from

Use right arrow key to advance, ctrl +/- to zoom

Source code:

Live demo:

(takes 5 seconds to update right-hand side plots, 5 minutes to update left-hand side plots)

What does it do?

  • Streams data from remote servers (e.g. Nginx log files)
  • Stores data to MongoDB
  • Queries MongoDB for interesting analytics (e.g. cache hit rate, avg upstream resp time)
  • Plots analytics data in the browser in real-time

What is it built with?

  • Python
  • MongoDB
  • Orbited, Stomp, Twisted,
  • jQuery, Flot


  • sources - classes for getting data from various sources
  • parsers - classes for parsing various formats of data
  • analyzers - classes for querying MongoDB for interesting information
  • reportgenerators - a class which assembles data for display in a browser
  • [executive modules] - two modules that run everything

sources module

Starts a subprocess and runs a ssh command to get data from a remote server []

from subprocess import Popen, PIPE, STDOUT

class SourceBase(object):
    def start_stream(self):
        self.p = Popen(self.ssh_cmd, shell=True, stdout=PIPE, stderr=STDOUT)

    def _assemble_ssh_command(self):
        """Use the dict, self.ssh_params and return the ssh command to run

    def get_line(self):
        """Use self.p.stdout.readline() to get a line of data

    def filter(self, line):
        """Return the line after optionally altering the line

class SourceLog(SourceBase):
    def __init__(self, ssh_params, filepath, encoding='utf-8'):
        self.ssh_params = ssh_params
        self.encoding = encoding
        self.cmd = 'tail --follow=name %s' % filepath

parsers module

Use regular expressions to parse data (e.g. from a log file) []

class BaseParser(object):
    def parse_line(cls, line):
        """Parse one line of data using the regex pattern, cls.pattern
        Return the data as a dict

    def convert_time(cls, time_str):
        """Convert date string to datetime object

    def post_process(cls, data):
        """Return optionally modified data

class NginxCacheParser(BaseParser):
    date_format = "%d/%b/%Y:%H:%M:%S"
    pattern = ' '.join([
            r'\*\*\*(?P<time>\S+ -\d{4})',
            r'ups_ad: (?P<ups_ad>.*)',
            r'ups_rt: (?P<ups_rt>.*)',
            # ...

settings for the sources module

Parameters pertaining to the getting and storing of data []

from sources import SourceLog
from parsers import NginxCacheParser

NG_CACHE_COLL = 'ng_cache' # the name of the MongoDB collection

    'us-ng1': {'host': 'us-ng1',
               'hostname': '',
               'identityfile': '/home/saltycrane/sshkeys/myprivatekey',
               'user': 'myusername',
    # ...

    {'source': (SourceLog, {'ssh_params': HOSTS['us-ng1'], 'filepath': '/var/log/nginx/cache.log', 'encoding': 'latin-1',}),
     'parser': NginxCacheParser,
     'collection': NG_CACHE_COLL,
    {'source': (SourceLog, {'ssh_params': HOSTS['us-ng2'], 'filepath': '/var/log/nginx/cache.log', 'encoding': 'latin-1',}),
     'parser': NginxCacheParser,
     'collection': NG_CACHE_COLL,
    # ...

sourceexecutive module

Executive which gets data, parses it, and stores it to MongoDB []

class SourceExecutive(object):
    def __init__(self, settings):
        self.collection = settings['collection']
        self.parser = settings['parser']
        self.source_class = settings['source'][0]
        self.kwargs = settings['source'][1]

    def start(self):

    def start_source_stream(self):
        self.source = self.source_class(**self.kwargs)

    def connect_to_mongo(self):
        """Connect to the MongoDB collection, self.collection

    def store_data(self):
        while True:
            line = self.source.get_line()
            data = self.parser.parse_line(line)

sourceexecutive module (cont'd)

from settings import SOURCES_SETTINGS

    s = SourceExecutive(settings)

analyzers module

This class is used to query MongoDB for the average upstream response time grouped by upstream server. []

class AvgUpstreamResponseTimePerServerLoggedIn(object):
    def __init__(self, mongo_collection, logged_in):
        self.mongo = mongo_collection
        self.logged_in = logged_in

    def run(self, time_limit):
        self.mongo.ensure_index([('time', ASCENDING), ('ups_rt', ASCENDING),
                                 ('ups_ad', ASCENDING), ('wp_login', ASCENDING), ])
        result =
            condition={'time': {'$gt': time_limit[0], '$lt': time_limit[1]},
                       'ups_rt': {'$ne': '-'},
                       'wp_login': re.compile(self.logged_in),
            initial={'count': 0, 'total': 0},
                function(doc, out) {
           += parseFloat(doc.ups_rt);
            finalize='function(out) {out.avg = / out.count}',
        result = sorted(result, key=lambda item: item['ups_ad']) = [r['avg'] for r in result]

reportgenerators module

Assemble a timeline of datapoints to send to the browser for plotting []

class FlotReportGenerator(object):
    def __init__(self, settings, index, processed_collection):
        """Set attributes to parameters from the settings module and connect to MongoDB

    def connect_to_mongo(self):
        """Connect to the MongoDB collection for storing history data

    def run(self):
        """Generate all the data to be passed to flot

    def create_metadata(self):
        """Assemble metadata data structures to send to the web frontend

    def calc_window_endpoints(self):
        """Calculate the window start and end points to be passed to each analyzer

reportgenerators module (cont'd)

# class FlotReportGenerator(object):
# continued...
    def run_analyzers_for_all_groups(self):
        """Loop through the list of groups (plots) to create and generate

    def write_datapoint_to_mongodb(self):
        """Write datapoint data structure to mongoDB

    def get_and_assemble_history_data_for_flot(self):
        """Get history data from mongoDB and assemble it for transmission to a
        flot stacked bar chart

    def assemble_current_datapoints(self):
        """Assemble a data structure of all the data points for the current
        time formatted according to self.groups[groupname]['format']

    def prepare_output_data(self):
        """Assign all data to a single data structure, self.out

settings for analysis / report generation

Parameters pertaining to analyzing the data []

from analyzers import AvgUpstreamResponseTimePerServerLoggedIn

    'cache0': {
        'label': 'Cache status (non-media)',
        'format': '%.1f%%',
        'collection': NG_CACHE_COLL,
        'flot_options': {'yaxis': {'max': 100, }, },
        'analyzers': [
            (CacheStatus, {'status': 'HIT', 'media': '0'}),
            (CacheStatus, {'status': 'MISS', 'media': '0'}),
            (CacheStatus, {'status': 'EXPIRED', 'media': '0'}),
            (CacheStatus, {'status': 'UPDATING', 'media': '0'}),
            (CacheStatus, {'status': 'STALE', 'media': '0'}),
    'aurt': {
        'label': 'Avg Upstream Resp Time',
        'format': '%.2f',
        'collection': NG_CACHE_COLL,
        'analyzers': [
             {'logged_in': r'^\s*$', }),
    # ...

settings for analysis / report generation (cont'd)

    'channel_name': '/topic/graph',
    'time_periods': [
        {'interval': 5 * 60,                      # in seconds
         'history_length': 144,                   # number of processed data points to save
         'default_window_length': 5 * 60 + 5,     # in seconds
         'default_flot_options': {
                'series': {'stack': 0,
                           'bars': {'show': True,
                                    'barWidth': (5 * 60) * (0.8 * 1000),
                                    'lineWidth': 1, }, },
                'xaxis': {'mode': "time",
                          'timeformat': "%H:%M",
                          'labelWidth': 20,
                          'labelHeight': 8,
                'yaxis': {'min': 0,
         'groups': PLOT_SET,
         # ...

analyzerexecutive module

Queries MongoDB, assembles data, and sends data to the browser. []

from stompservice import StompClientFactory
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from reportgenerators import FlotReportGenerator
from settings import ANALYSIS_SETTINGS

class AnalyzerExecutive(StompClientFactory):
    def __init__(self, settings):
        self.channel_name = settings['channel_name']
        self.time_periods = settings['time_periods']
        self.intervals = []
        self.report_generators = []

    def instantiate_report_generators(self):
        """Do this once for each report generator because we want to connect
        to mongoDB only once
        for i, settings in enumerate(self.time_periods):
            processed_collection = 'processed_%d' % i
            rg = FlotReportGenerator(settings, i, processed_collection)

analyzerexecutive module (cont'd)

# class AnalyzerExecutive(StompClientFactory):
# continued...
    def recv_connected(self, msg):
        """Start infinite loops for each of the time periods
        for i in range(len(self.report_generators)):
            self.timer = LoopingCall(self.generate_and_send_data, i)

    def generate_and_send_data(self, i):
        """This is called every loop
        jsondata = json.dumps(self.report_generators[i].out)
        self.send(self.channel_name, jsondata)

ae = AnalyzerExecutive(ANALYSIS_SETTINGS)
reactor.connectTCP('localhost', 61613, ae)


  • Use WebSockets instead of Orbited, Stomp, and
  • Make into a YAML file
  • Considering Protovis instead of Flot
  • Add plot legends
  • Clean up process termination