Someone once published a large dataset containing 1.7 billion reddit comments dated from 2007 to 2015. I downloaded it and never did anything with it. Today I thought I would used it to have some fun with ElasticSearch and Python.

Indexing

Data will be indexed in ElasticSearch. The first step is to setup an index using the official python library:

import elasticsearch as es

client = es.Elasticsearch(hosts=[{'host': host, 'port': port}])

mappings = {
    'post': {
        'properties': {
            'subreddit': {'type': 'string'},
            'author': {'type': 'string'},
            'body': {'type': 'string'},
            'created': {'type': 'date', 'format': 'epoch_second'},
            'score': {'type': 'integer'},
        }
    }
}

body = {'mappings': mappings}

try:
    client.indices.create(index=index_name, body=body)
except es.exceptions.TransportError as e:
    if e.error != 'index_already_exists_exception':
        raise

We define a data mapping to indicate the type of each document property, then create the index if it does not already exist.

Reddit comments are stored as JSON objects in compressed files, so we have to read these files and extract objects one by one. We implement an iterator that read each line of a compressed file and parses it.

import bz2, json

class DatasetReader(object):
    """Iterate on the objects stored in a reddit dataset."""

    def __init__(self, path):
        self.fp = bz2.open(path, mode='rt')
        self.decoder = json.JSONDecoder()

    def __iter__(self):
        return self

    def __next__(self):
        line = self.fp.readline()
        if not line:
            raise StopIteration
        return self.decoder.decode(line)

We now insert a document for each object, with the information we are interested about. There are a few transformations to do:

  • Parse identifiers which are stored in base 36.
  • Parse the timestamp indicating the creation date.
  • Detect deleted authors and posts.
import sys

for obj in DatasetReader(path):
    try:
        post_id_str = obj['id']
        if post_id_str.startswith('tr1_'):
            post_id_str = post_idx_str[4:]
        post_id = int(post_id_str, 36)
    except ValueError:
        print('ignore post with invalid id "%s"' % (obj), file=sys.stderr)
        continue

    try:
        created_str = obj['created_utc']
        created = int(created_str)
    except ValueError:
        print('ignore post with invalid timestamp "%s"' % (created_str),
              file=sys.stderr)
        continue

    document = {
        'id': post_id,
        'subreddit': obj['subreddit'],
        'created': created,
        'score': obj['score'],
    }

    body = obj['body']
    if body != '[deleted]':
        document['body'] = body

    author = obj['author']
    if author != '[deleted]':
        document['author'] = author

    client.index(index=index_name, doc_type='post', id=post_id,
                 body=document)

Running queries

It’s now possible to run queries:

results = client.search(index=index_name, q=query)
for result in results['hits']['hits']:
    post = result['_source']
    print(post)

Final program

#!/usr/bin/env python

import argparse, bz2, json, sys
import elasticsearch as es

class DatasetReader(object):
    """Iterate on the objects stored in a reddit dataset."""

    def __init__(self, path):
        self.fp = bz2.open(path, mode='rt')
        self.decoder = json.JSONDecoder()

    def __iter__(self):
        return self

    def __next__(self):
        line = self.fp.readline()
        if not line:
            raise StopIteration
        return self.decoder.decode(line)

def cmd_index_dataset(args):
    """Index a reddit dataset in elasticsearch."""
    path = args.file
    host = args.host
    port = args.port
    index_name = args.index

    client = es.Elasticsearch(hosts=[{'host': host, 'port': port}])

    # Create the index if it does not already exist
    mappings = {
        'post': {
            'properties': {
                'subreddit': {'type': 'string'},
                'author': {'type': 'string'},
                'body': {'type': 'string'},
                'created': {'type': 'date', 'format': 'epoch_second'},
                'score': {'type': 'integer'},
            }
        }
    }

    body = {'mappings': mappings}

    try:
        client.indices.create(index=index_name, body=body)
    except es.exceptions.TransportError as e:
        if e.error != 'index_already_exists_exception':
            raise

    # Insert comments
    for obj in DatasetReader(path):
        try:
            post_id_str = obj['id']
            if post_id_str.startswith('tr1_'):
                post_id_str = post_idx_str[4:]
            post_id = int(post_id_str, 36)
        except ValueError:
            print('ignore post with invalid id "%s"' % (obj), file=sys.stderr)
            continue

        try:
            created_str = obj['created_utc']
            created = int(created_str)
        except ValueError:
            print('ignore post with invalid timestamp "%s"' % (created_str),
                  file=sys.stderr)
            continue

        document = {
            'id': post_id,
            'subreddit': obj['subreddit'],
            'created': created,
            'score': obj['score'],
        }

        body = obj['body']
        if body != '[deleted]':
            document['body'] = body

        author = obj['author']
        if author != '[deleted]':
            document['author'] = author

        client.index(index=index_name, doc_type='post', id=post_id,
                     body=document)

def cmd_query(args):
    """Execute an elasticsearch query."""
    query = args.query
    host = args.host
    port = args.port
    index_name = args.index

    client = es.Elasticsearch(hosts=[{'host': host, 'port': port}])

    results = client.search(index=index_name, q=query)
    for result in results['hits']['hits']:
        post = result['_source']
        print(post)

def main():
    parser = argparse.ArgumentParser(prog=sys.argv[0])

    parser.add_argument('-a', '--host', metavar='<host>',
                        help='the elasticsearch host (default: localhost)')
    parser.add_argument('-p', '--port', default=9200, metavar='<port>',
                        help='the elasticsearch port (default: 9200)')
    parser.add_argument('-i', '--index', default='reddit', metavar='<name>',
                        help='the name of the elasticsearch index '+
                             '(default: reddit)')

    subparsers = parser.add_subparsers(title='commands')

    subparser = subparsers.add_parser('index', help='index a reddit dataset')
    subparser.set_defaults(func=cmd_index_dataset)
    subparser.add_argument('file', metavar='<file>', help='the reddit dataset')

    subparser = subparsers.add_parser('query',
                                      help='run an elasticsearch query')
    subparser.set_defaults(func=cmd_query)
    subparser.add_argument('query', metavar='<query>',
                           help='the elasticsearch query')

    args = parser.parse_args(sys.argv[1:])
    args.func(args)

if __name__ == "__main__":
    main()