Skip to content

Parallel

Provides methods for the ATHENA project for running parallelized version of software using mpi4py.

Handles parrallel operations using MPI when available

continue_run(rank, contin)

Stops run and ends MPI

Parameters:

Name Type Description Default
rank int

process rank in MPI

required
contin bool

when False stop run

required

Returns:

Type Description
None

None

Source code in src/athenage/genn/parallel.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def continue_run(rank: int, contin: bool) -> None:
    """ Stops run and ends MPI

    Args:
        rank: process rank in MPI
        contin: when False stop run

    Returns:
        None
    """

    if rank != 0:
        contin = None
    contin = comm.bcast(contin, root=0)
    if contin == False:
        exit()

distribute_data(rank, data, train_splits, test_splits, vmap, grammar)

Broadcast data and related values to all processes from root so that all processes are using the same data and splits.

Parameters:

Name Type Description Default
rank int

process number in MPI

required
data DataFrame

dataset to analyze

required
train_splits ndarray

contains indexes for managing training splits

required
test_splits ndarray

contains indexes for managing testing splits

required
vmap dict

dict mapping new variable name to old one

required
grammar str

contains grammar to use in GE

required

Returns:

Name Type Description
data DataFrame

dataset to analyze

train_splits ndarray

contains indexes for managing training splits

test_splits ndarray

contains indexes for managing testing splits

vmap dict

dict mapping new variable name to old one

grammar str

contains grammar to use in GE

Source code in src/athenage/genn/parallel.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def distribute_data(rank: int,data: pd.DataFrame,train_splits: np.ndarray, 
                    test_splits: np.ndarray, vmap: dict, 
                    grammar: str) -> tuple[pd.DataFrame,np.ndarray,np.ndarray,dict,str]:
    """Broadcast data and related values to all processes from root so that
        all processes are using the same data and splits.

    Args:
        rank: process number in MPI 
        data: dataset to analyze
        train_splits: contains indexes for managing training splits
        test_splits: contains indexes for managing testing splits
        vmap: dict mapping new variable name to old one
        grammar: contains grammar to use in GE 

    Returns:
        data: dataset to analyze
        train_splits: contains indexes for managing training splits
        test_splits: contains indexes for managing testing splits
        vmap: dict mapping new variable name to old one
        grammar: contains grammar to use in GE 
    """

    if rank != 0:
        data = None
        train_splits = None
        test_splits = None
        vmap = None

    data = comm.bcast(data, root=0)
    train_splits = comm.bcast(train_splits, root=0)
    test_splits = comm.bcast(test_splits, root=0)
    vmap = comm.bcast(vmap, root=0)
    grammar = comm.bcast(grammar, root=0)

    return data,train_splits, test_splits, vmap, grammar

distribute_params(params, rank)

Broadcast parameters to all processes in run

Parameters:

Name Type Description Default
params dict

key is parameter and option is value

required

Returns:

Name Type Description
params dict

dict containing parameters

Source code in src/athenage/genn/parallel.py
27
28
29
30
31
32
33
34
35
36
37
38
def distribute_params(params: dict, rank: int) -> dict:
    """Broadcast parameters to all processes in run

    Args:
        params: key is parameter and option is value

    Returns:
        params: dict containing parameters
    """

    params = comm.bcast(params, root=0)
    return params

exchange_best(ind)

Share best individual with all other processes

Source code in src/athenage/genn/parallel.py
94
95
96
def exchange_best(ind: "deap.creator.Individual") -> "deap.creator.Individual":
    """ Share best individual with all other processes"""
    return comm.allgather(ind)

get_nprocs()

Return number of processes in the run

Source code in src/athenage/genn/parallel.py
22
23
24
def get_nprocs() -> int:
    """ Return number of processes in the run"""
    return comm.Get_size()

get_rank()

Return rank within MPI run for this process

Source code in src/athenage/genn/parallel.py
18
19
20
def get_rank() -> int:
    """Return rank within MPI run for this process"""
    return comm.Get_rank()

get_stats(stats, population)

Generate fitness lists for this process and send to the root

Parameters:

Name Type Description Default
stats Statistics

deap statistics object

required
population list

individuals in population

required

Returns:

Name Type Description
dict dict

contains statistics generated from fitness scores of the populations

Source code in src/athenage/genn/parallel.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_stats(stats: "deap.tools.Statistics", population: list) -> dict:
    """ Generate fitness lists for this process and send to the root

    Args:
        stats: deap statistics object
        population: individuals in population

    Returns:
        dict: contains statistics generated from fitness scores of the populations
    """
    # generate fitness lists and send to head proc
    scores = [ind.fitness.values[0] for ind in population if not ind.invalid]
    recv = None
    recv = comm.gather(scores, root=0)
    if proc_rank == 0:
        scores = [val for xs in recv for val in xs]

    return {'avg':np.nanmean(scores), 
        'std':np.nanstd(scores),
        'min':np.nanmin(scores),
        'max':np.nanmax(scores)}

send_log_info(length, nodes, depth, used_codons, invalid, n_inds, n_unique_structs)

Gather all logging information at root process

Parameters:

Name Type Description Default
length list

length of each individual

required
nodes list

number of nodes for each individual

required
depth list

depth of each individual in population

required
used_codons list

number of used codons for each individual

required
invalid int

number of invalid individuals in population

required
n_inds int

number of individuals in population

required
n_unique_structs int

number of unique structures created by population

required

Returns:

Name Type Description
log_data dict

dict containing compiled logging stats for all processes

Source code in src/athenage/genn/parallel.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def send_log_info(length: list, nodes: list, depth: list, used_codons: list, invalid: int, 
                  n_inds: int, n_unique_structs: int) -> dict:  
    """ Gather all logging information at root process

    Args:
        length: length of each individual 
        nodes: number of nodes for each individual
        depth: depth of each individual in population
        used_codons: number of used codons for each individual
        invalid: number of invalid individuals in population
        n_inds: number of individuals in population
        n_unique_structs: number of unique structures created by population

    Returns:
        log_data: dict containing compiled logging stats for all processes
    """      

    log_data = {'sum_length':sum(length), 'n_length':len(length), 'sum_nodes':sum(nodes),
        'n_nodes':len(nodes), 'sum_used_codons':sum(used_codons), 'n_used_codons':len(used_codons),
        'sum_depth':sum(depth), 'n_depth':len(depth), 'invalid':invalid, 'n_inds':n_inds,
        'n_unique_structs':n_unique_structs}
    recv = None
    recv = comm.gather(log_data, root=0)
    if proc_rank == 0:
        totals = recv[0]
        for i in range(1, len(recv)):
            for key,value in recv[i].items():
                totals[key] += recv[i][key]        
        return totals
    else:
        return log_data