clusterer icon indicating copy to clipboard operation
clusterer copied to clipboard

A package for easier interface and setup for pipelining in distributed cluster.

Clusterer

Idea

The ideology behind the library is to simplify the distribution of multiple processes to different machine. Imagine the trivial pipeline dependency example below.

Image to Example

But if you want more instance of Process B as it takes long for it than the rate at which process A produces.We can just make more copies of process B so each one can consume A's output independently

This is what the library tries to tackle.

How we do it

Suppose for above example consider A puts its output to a shared queue and all the instances of B are waiting on the queue. The one that gets the output of A processes it and adds to the queue on which instance of process C are waiting.

So there is a server process which has all the queues, and other client processes (actual instances which do the work) communicate with server for values in the queue and after working on them ask server to put it into the correct queue for consumption by other process.

Let's get dirty!

Example Functions

# Process A (Generate a Random Number)
@i_wrap
def Generate():
    time.sleep(2)
    number = random.randint(1,10)
    _time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
    # Print so that you can compare.
    print({'time':_time,'number':number})
    return {'time':_time,'number':number} # its a dict

# Process B
@io_wrap
def Square(x):
    # For Sanity check.
    print('Sqare',x)
    x = x['number'] # Remember its a dict
    return x*x

# Process C
@io_wrap
def Subtract1(x):
    print('Subtract',x)
    return x-1

# Process D
@o_wrap
def Print(x):
    print('Print',x)

Now that we have our functions that we wanna distribute ready. Lets Configure the server and clients.

BaseServer

# Initialize Server
c = BaseServer(port=50000,authkey=b'password')

# Register queues that server will be handling
c.configure(['gen2sq_q','sq2sub_q','sub2pr_q'])

# Start the server
c.runserver()

BaseClient

from functions import *

if __name__=='__main__':
    # Configure to connect to server.
    c = BaseClient(('server_ip',50000),authkey=b'password')

    # List functions,their args and  number of processes of each function to run on this client.
    funcs = [Generate,Square,Subtract1,Print]
    args = [['gen2sq_q'],['gen2sq_q','sq2sub_q'],['sq2sub_q','sub2pr_q'],['sub2pr_q']]
    num_process = [1,1,1,1]
    
    c.process(funcs,args,num_process)

    # If you want to start only one.
    #c.process(Generate,['gen2square_q'],1)

Broadcast Server and Client

These server and client communicate over the UDP. So you dont need to find and type the server ip in the client file.

Function Server and Client

Improving on Broadcast, here the server tells a newly connected client which functions to run.

Work in progress

What Next?

The problem is that you always need to move the files manually to the client.

So the aim is for server to send all the necessary project files to the client and tell it to run the necessary process. The distribution of process will be according to user-defined rules/criterion.