Examples

Streaming

String Input

Counting the number of times each word occurs in a “n” (newline) delimited string.

examples/streaming_str_ex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import re
from mockr import run_stream_job

WORD_RE = re.compile(r"[\w']+")

def map_fn(chunk):
    # yield each word in the line
    for word in WORD_RE.findall(chunk):
        yield (word.lower(), 1)

def reduce_fn(key, values):
    yield (key, sum(values))

input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

results = run_stream_job(input_str, map_fn, reduce_fn)

print(results)

Output:

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]

Text File Input

Counting the number of times each word occurs in a text file.

examples/streaming_txtfile_ex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import re
from mockr import run_stream_job

WORD_RE = re.compile(r"[\w']+")

def map_fn(chunk):
    # yield each word in the line
    for word in WORD_RE.findall(chunk):
        yield (word.lower(), 1)

def reduce_fn(key, values):
    yield (key, sum(values))

input_file = open("MobyDick.txt", 'r')

input_str = input_file.read()

results = run_stream_job(input_str, map_fn, reduce_fn)

print(results)

Output:

[('the', 14697), ('project', 91), ('gutenberg', 92), ('ebook', 10), ('of', 6742), ('moby', 89), ('dick', 88)......]

Pandas

Calculating the average of the “Age” column of the dataframe.

The Averager class runs with 4 chunks meaning that the dataframe is divided into four subsets. Each subset is sent to a different map worker.

The map worker calculates the average of its own subset. The reduce worker then collates all “mean” values into a global mean.

examples/pandas_ex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from mockr import run_pandas_job
import pandas as pd

def map_fn(chunk):
    subset_mean = chunk['Age'].mean()
    yield ("mean", subset_mean)

def reduce_fn(key, values):
    list_values = list(values)
    yield (key, sum(list_values)/len(list_values))

dataframe = pd.read_csv('ages.csv')

results = run_pandas_job(dataframe, map_fn, reduce_fn, n_chunks = 4)

print(results)

Output:

[('mean', 30.090000000000003)]

Python Sequences (list etc)

Counting the number of times each word occurs in a list of strings.

Processing Items Seperately (Dedicated Map Worker)

examples/python_ex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import re
from mockr import run_sequence_job

WORD_RE = re.compile(r"[\w']+")

def map_fn(chunk):
    # yield each word in the line
    for word in WORD_RE.findall(chunk):
        yield (word.lower(), 1)

def reduce_fn(key, values):
    yield (key, sum(values))

input_list = ["Hello!", "This is a sample string.", "It is very simple.", "Goodbye!"]

results = run_sequence_job(input_list, map_fn, reduce_fn, n_chunks=None)

print(results)

Output:

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]

Processing Groups of Items (Shared Map Worker)

examples/python_chunks_ex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import re
from mockr import run_sequence_job

WORD_RE = re.compile(r"[\w']+")

def map_fn(chunk):
    # join all strings in the sub_list
    line = ",".join(chunk)

    # yield each word in the line
    for word in WORD_RE.findall(line):
        yield (word.lower(), 1)

def reduce_fn(key, values):
    yield (key, sum(values))

input_list = ["Hello!", "This is a sample string.", "It is very simple.", "Goodbye!"]

results = run_sequence_job(input_list, map_fn, reduce_fn, n_chunks=2)

print(results)

Output:

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]