Skip to content

Data processing

Provides methods for the ATHENA project for procssing input files, transforming data, writing reports and plots.

Reads data files and transforms data

compress_weights(model_str)

Compresses weights/constants to simplify the model string

Source code in src/athenage/utilities/data_processing.py
432
433
434
435
436
437
def compress_weights(model_str):
    """ Compresses weights/constants to simplify the model string"""
    if re.search(r"[PA|PD|PM|PS|PAND|PNAND|POR|PXOR|PNOR]", model_str):
        return compress_weights_nn(model_str)
    else:
        return compress_weights_sr(model_str)

construct_nodes(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE network Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
560
561
562
563
564
565
566
567
568
569
570
571
572
def construct_nodes(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE network
     Returns
       nodes constructed from the model
    """ 
    if re.search(r"PA|PS|PD|PM|PAND|PNAND|POR|PXOR|PNOR]", modelstr):
        return construct_nodes_nn(modelstr)
    else:
        return construct_nodes_sr(modelstr)

construct_nodes_nn(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE neural network model Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
def construct_nodes_nn(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE neural network model
     Returns
       nodes constructed from the model
    """ 

    model = modelstr.replace('([', ' [').replace('])', '] ').replace('(', ' ( ').replace(')', ' ) ')
    ignore = {','}
    elements = model.split()

    stack = deque()

    stack.append(elements[0])
    i = 1
    nodes = []

    # use stack to construct the nodes/edges
    while stack:
        if elements[i] in ignore:
            i+=1
        elif elements[i] == ')':
            enditem = elements[i]
            item = stack.pop()
            popitems = list()
            # pop and keep all the items that are not the matching enditem
            while item != '(':
                popitems.append(item)
                item = stack.pop()

            # this will now be 3 elements with the first being a node, second * and third the weight            
            if isinstance(popitems[0], Node):
                popitems[0].weight = popitems[2]
                node = popitems[0]
            else:
                node = Node(weight = popitems[2], num=len(nodes), label=popitems[0])
                nodes.append(node)

            # push the node back on to the stack
            stack.append(node)
            i += 1
        elif elements[i] == ']':
            # should only be nodes on stack until '['
            item=stack.pop()
            function_nodes = list()
            while item != '[':
                function_nodes.append(item.num)
                item=stack.pop()

            # element after will be a node
            item = stack.pop()
            if not isinstance(item, Node):
                node = Node(num=len(nodes), label=item)
                nodes.append(node)
            else:
                node = item

            for n in function_nodes:
                nodes[n].to = node.num
            # when empty all nodes have been processed
            if not stack:
                break
            else:
                stack.append(node)
                i += 1
        else:
            stack.append(elements[i])
            i+=1

    return nodes

construct_nodes_sr(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE symbolic regression model Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def construct_nodes_sr(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE symbolic regression model
     Returns
       nodes constructed from the model
    """ 

    model = modelstr.replace('activate(','')
    model = model[:-1]
    postfix_stack = infix_to_postfix(model)

    operators = {"+":2,"-":2,"*":2,"pdiv":2}
    stack = deque()
    nodes=[]

    for i in range(len(postfix_stack)):
        if postfix_stack[i] not in operators:
            nodes.append(Node(label=postfix_stack[i], num=len(nodes)))
            stack.append(nodes[-1])
        else:
            m = operators[postfix_stack[i]]
            for j in range(m):
                n = stack.pop()
                n.to = len(nodes)
            nodes.append(Node(label=postfix_stack[i], num=len(nodes)))
            stack.append(nodes[-1])
    return nodes

format_number(num, max_decimals=2)

Formats a number as a string with a maximum number of decimals, only if needed.

Source code in src/athenage/utilities/data_processing.py
428
429
430
def format_number(num, max_decimals=2):
    """Formats a number as a string with a maximum number of decimals, only if needed."""
    return f"{num:.{max_decimals}f}".rstrip('0').rstrip('.')

generate_splits(ncvs, fitness_type, df, have_test_file=False, test_df=None, rand_seed=1234)

Generate splits for training and testing based on number of cross-validation intervals requested.

Parameters:

Name Type Description Default
ncvs int

number of splits (cross-validations)

required
fitness_type str

for 'r-squared' split into specified number of folds, otherwise split balancing classes in data

required
df DataFrame

dataset to use for splitting

required
have_test_file bool

when true use the test_df as the tesing set

False
test_df DataFrame

when using a test_file contains the testing dataset

None
rand_seed int

controls split

1234

Returns:

Name Type Description
train_splits ndarray

2-D array of indexes to use in traininig

test_splits ndarray

2-D array of indexes to use in testing

df DataFrame

dataset to use with these indexes, concatenated for training and testing when test dataset provided

Source code in src/athenage/utilities/data_processing.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def generate_splits(ncvs: int, fitness_type: str, df: pd.DataFrame, have_test_file: bool=False, test_df: pd.DataFrame=None, 
                    rand_seed: int=1234) -> tuple[np.ndarray,np.ndarray,pd.DataFrame]:
    """Generate splits for training and testing based on number of cross-validation intervals
        requested.

    Args:
        ncvs: number of splits (cross-validations)
        fitness_type: for 'r-squared' split into specified number of folds, otherwise split balancing classes in data
        df: dataset to use for splitting
        have_test_file: when true use the test_df as the tesing set
        test_df: when using a test_file contains the testing dataset
        rand_seed: controls split


    Returns: 
        train_splits: 2-D array of indexes to use in traininig
        test_splits: 2-D array of indexes to use in testing
        df: dataset to use with these indexes, concatenated for training and testing when test dataset provided
    """
    if ncvs > 1:
        if fitness_type== 'r-squared':
            (train_splits, test_splits) = split_kfolds(df, ncvs, 
                rand_seed)
        else:
            (train_splits, test_splits) = split_statkfolds(df, ncvs, 
                rand_seed)
    else:
        train_splits = np.zeros((1,df.shape[0]))
        train_splits[0] = np.array([i for i in range(df.shape[0])])
        if not have_test_file:
            test_splits = np.zeros((1,0))
        else:
            test_splits = np.zeros((1, test_df.shape[0]))
            test_splits[0] = np.array([i for i in range(df.shape[0], test_df.shape[0] + df.shape[0])])
            df = pd.concat([df, test_df], axis=0)

    return train_splits, test_splits, df

prepare_split_data(df, train_indexes, test_indexes)

Create and return data arrays for training and testing using indexes passed.

Parameters:

Name Type Description Default
df DataFrame

data set to split

required
train_indexes ndarray

rows in dataset to make training set

required
test_indexes ndarray

rows in dataset to make test set

required

Returns:

Name Type Description
X_train ndarray

x values in training

Y_train ndarray

y values in training

X_test ndarray

x values for testing

Y_test ndarray

y values for testing

Source code in src/athenage/utilities/data_processing.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def prepare_split_data(df: pd.DataFrame, train_indexes: np.ndarray, 
                       test_indexes: np.ndarray) -> tuple[np.ndarray,np.ndarray,np.ndarray,np.ndarray]:
    """Create and return data arrays for training and testing using indexes passed.

    Args:
        df: data set to split
        train_indexes: rows in dataset to make training set
        test_indexes: rows in dataset to make test set

    Returns: 
        X_train: x values in training
        Y_train: y values in training
        X_test: x values for testing
        Y_test: y values for testing
    """
    traindf = df.iloc[train_indexes]
    testdf = df.iloc[test_indexes]

    train_rows = traindf.shape[0]
    train_cols = traindf.shape[1]-1

    X_train = np.zeros([train_rows,train_cols], dtype=float)
    Y_train = np.zeros([train_rows,], dtype=float)
    for i in range(train_rows):
        for j in range(train_cols):
            X_train[i,j] = traindf['x'+str(j)].iloc[i]
    for i in range(train_rows):
        Y_train[i] = traindf['y'].iloc[i]

    test_rows=testdf.shape[0]
    test_cols=testdf.shape[1]-1

    X_test = np.zeros([test_rows,test_cols], dtype=float)
    Y_test = np.zeros([test_rows,], dtype=float)
    for i in range(test_rows):
        for j in range(test_cols):
            X_test[i,j] = testdf['x'+str(j)].iloc[i]
    for i in range(test_rows):
        Y_test[i] = testdf['y'].iloc[i]

    X_train = np.transpose(X_train)
    X_test = np.transpose(X_test)

    return X_train,Y_train,X_test,Y_test

process_continfile(fn, scale, missing=None, included_vars=None)

Read in continuous data and construct dataframe from values

Parameters:

Name Type Description Default
fn str

Phenotypes (outcomes) filename

required
scale bool

normalize values if true

required
missing str

identifies any missing data in file

None
included_vars list[str]

restrict set to only variables (column names) in list

None

Returns:

Type Description
DataFrame

pandas dataframe

Source code in src/athenage/utilities/data_processing.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def process_continfile(fn: str, scale: bool, missing: str=None, included_vars: list[str]=None) -> pd.DataFrame:
    """Read in continuous data and construct dataframe from values

    Args:
        fn: Phenotypes (outcomes) filename
        scale: normalize values if true
        missing: identifies any missing data in file
        included_vars: restrict set to only variables (column names) in list

    Returns: 
        pandas dataframe 
    """
    data = pd.read_table(fn, delim_whitespace=True, header=0, keep_default_na=False)

    if included_vars:
        data=data.loc[:, data.columns.isin(included_vars)]

    if missing:
        data.loc[:,data.columns!='ID'] = data.loc[:,data.columns!='ID'].replace(missing, np.nan)

    data.loc[:,data.columns!='ID'] = data.loc[:,data.columns!='ID'].astype(float)

    if scale:
        data.loc[:,data.columns!='ID'] = data.loc[:,data.columns!='ID'].apply(normalize, axis=0)

    return data

process_genofile(fn, encoding, missing=None, included_vars=None)

Read in genotype data and construct dataframe from values

Parameters:

Name Type Description Default
fn str

Phenotypes (outcomes) filename

required
encoding str

Genotype encoding type

required
missing str

identifies missing data in file

None
included_vars list[str]

restrict set to only variables in list

None

Returns:

Name Type Description
data DataFrame

pandas dataframe

geno_map dict

dictionary with new label as key, original label as value

Source code in src/athenage/utilities/data_processing.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def process_genofile(fn: str, encoding: str, missing: str=None, included_vars: list[str]=None) ->  tuple[pd.DataFrame, dict]:
    """Read in genotype data and construct dataframe from values

    Args:
        fn: Phenotypes (outcomes) filename
        encoding: Genotype encoding type
        missing: identifies missing data in file
        included_vars: restrict set to only variables in list

    Returns: 
        data: pandas dataframe
        geno_map: dictionary with new label as key, original label as value
    """
    data = pd.read_table(fn, delim_whitespace=True, header=0, keep_default_na=False)

    if included_vars:
        data=data.loc[:, data.columns.isin(included_vars)]

    if missing:
        # data.replace([missing], np.nan, inplace=True)
        data.loc[:,data.columns!='ID'] = data.loc[:,data.columns!='ID'].replace(missing, np.nan)

#     oldcols = list(data.drop('y', axis=1).columns)
    labels = list(data.columns)
    geno_map={}

    if encoding == 'additive':
        data = data.astype(str).apply(additive_encoding)
        geno_map = {labels[i]:labels[i] for i in range(0,len(labels))}

    if encoding == 'add_quad':
        orig_columns = data.loc[:,data.columns!='ID'].columns
        new_df = data[data.loc[:,data.columns!='ID'].columns.repeat(2)]

        columns = list(new_df.columns)
        columns[::2]= [ x + "_a" for x in new_df.columns[::2]]
        columns[1::2]= [ x + "_b" for x in new_df.columns[1::2]]

        # map back to original columns 
        geno_map = {columns[i]:orig_columns[(i)//2] for i in range(0,len(columns))}

        new_df.columns = columns
        add_quad_encoding(new_df)
        # add back ID column
        new_df.insert(0,"ID",data['ID'])
        data = new_df

    return data, geno_map

process_grammar_file(grammarfn, data)

Reads grammar file into string and adds all x variables present in dataframe

Parameters:

Name Type Description Default
grammarfn str

grammar filename to read and modify

required
data DataFrame

dataset to be used with the grammar

required

Returns:

Name Type Description
updated_grammar str

grammar text modified for number of variables in data

Source code in src/athenage/utilities/data_processing.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def process_grammar_file(grammarfn: str, data: pd.DataFrame) -> str:
    """Reads grammar file into string and adds all x variables present in dataframe

    Args:
        grammarfn: grammar filename to read and modify
        data: dataset to be used with the grammar

    Returns: 
        updated_grammar: grammar text modified for number of variables in data
    """
    with open(grammarfn, "r") as text_file:
        grammarstr = text_file.read()

    nvars = len([xcol for xcol in data.columns if 'x' in xcol])
    updated_grammar=""
    for i,line in enumerate(grammarstr.splitlines()):
        if re.search(r"^\s*<v>",line):
             line = "<v> ::= " + ' | '.join([f"x[{i}]" for i in range(nvars)])
        updated_grammar += line + "\n"
    return updated_grammar

process_var_colormap(colorfn=None, node_color='lightgray', var_default='white')

Create color map for graphical output of networks. Files format is tab-delimited in order of category, color, inputs

Parameters:

Name Type Description Default
colorfn str

name of file to process, when no fn provided only the network nodes (PA,PD,PM,PS,PAND,PNAND,POR,PXOR,PNOR) are included

None
node_color str

color for the operator nodes

'lightgray'
var_default str

Default colors for unspecified variables

'white'

Returns:

Name Type Description
color_map ColorMapping

node name as key and color as value

Source code in src/athenage/utilities/data_processing.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def process_var_colormap(colorfn: str=None, node_color: str='lightgray', 
    var_default: str='white') -> ColorMapping:
    """Create color map for graphical output of networks. Files format is tab-delimited
        in order of category, color, inputs

    Args:
        colorfn: name of file to process, when no fn provided only the network nodes
            (PA,PD,PM,PS,PAND,PNAND,POR,PXOR,PNOR) are included
        node_color: color for the operator nodes
        var_default: Default colors for unspecified variables

    Returns: 
        color_map: node name as key and color as value
    """
    color_map = ColorMapping(default_color=var_default, operator_color=node_color)

    color_map.add_category('netnodes', color_map.operator_color)
    color_map.add_nodes({'PA':color_map.operator_color,'PD':color_map.operator_color,
    'PM':color_map.operator_color,'PS':color_map.operator_color,'PAND':color_map.operator_color,
    'PNAND':color_map.operator_color, 'POR':color_map.operator_color, 'PNOR':color_map.operator_color,
    'PXOR':color_map.operator_color},'netnodes')

    # header for file is category,color,inputs
    if colorfn:
        with open(colorfn) as csv_file:
            #skip header
            heading = next(csv_file)
            reader = csv.reader(csv_file, delimiter='\t')
            # reader = csv.reader(csv_file, delim_whitespace=True)

            for row in reader:
                if not row:
                    continue
                # set category color
                color_map.add_category(row[0],row[1])
                for in_var in row[2:]:
                    for var in in_var.split():
                        color_map.add_input(var,row[0],row[1])

    return color_map

read_input_files(outcomefn, genofn, continfn, out_scale=False, contin_scale=False, geno_encode=None, missing=None, outcome=None, included_vars=None)

Read in data and construct pandas dataframe

Parameters:

Name Type Description Default
outcomefn str

Phenotypes (outcomes) filename

required
genofn str

SNP values filename

required
continfn str

any continuous data filename

required
out_scale bool

scale outcome values from 0 to 1.0

False
contin_scale bool

scale each continuous variable from 0 to 1.0

False
geno_encode str

encode genotype data. options are 'add_quad' and 'additive'

None
outcome str

column header in continfn to use for 'y'

None
included_vars list[str]

list of variable names to include in analysis; all others excluded

None

Returns:

Name Type Description
dataset_df DataFrame

pandas dataframe

inputs_map dict

dictionary with new label as key, original label as value

unmatched list

list of IDs that are not in all input files

Source code in src/athenage/utilities/data_processing.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def read_input_files(outcomefn: str, genofn: str, continfn: str, out_scale: bool=False,
    contin_scale: bool=False, geno_encode: str=None, missing: str=None, outcome: str=None,
    included_vars: list[str]=None) -> tuple[pd.DataFrame, dict, list]:
    """Read in data and construct pandas dataframe

    Args:
        outcomefn: Phenotypes (outcomes) filename
        genofn: SNP values filename
        continfn: any continuous data filename
        out_scale: scale outcome values from 0 to 1.0
        contin_scale: scale each continuous variable from 0 to 1.0
        geno_encode: encode genotype data. options are 'add_quad' and 'additive'
        outcome: column header in continfn to use for 'y'
        included_vars: list of variable names to include in analysis; all others excluded

    Returns:
        dataset_df: pandas dataframe
        inputs_map: dictionary with new label as key, original label as value
        unmatched: list of IDs that are not in all input files
    """

    y_df = process_continfile(outcomefn, out_scale)

    if outcome is None:
        dataset_df = y_df[['ID',y_df.columns[1]]]
    else:
        dataset_df = y_df[['ID',outcome]]

    dataset_df.columns = ['ID', 'y']

    if included_vars:
        included_vars.insert(0, 'ID')

    contin_df = None
    inputs_map = {}
    if continfn:
        contin_df = process_continfile(continfn, contin_scale, missing, included_vars)
        inputs_map={contin_df.columns[i]:contin_df.columns[i] for i in range(0,len(contin_df.columns))}

    if genofn:
        geno_df, geno_map = process_genofile(genofn, geno_encode, missing, included_vars)
        inputs_map.update(geno_map)

    dataset_df = dataset_df.sort_values('ID', ascending=False)
    unmatched = []

    if genofn:
        unmatched.extend(dataset_df[~dataset_df['ID'].isin(geno_df['ID'])]['ID'].tolist())
        unmatched.extend(geno_df[~geno_df['ID'].isin(dataset_df['ID'])]['ID'].tolist())
        dataset_df = pd.merge(dataset_df,geno_df,on="ID", validate='1:1')

    if continfn:
        unmatched.extend(dataset_df[~dataset_df['ID'].isin(contin_df['ID'])]['ID'].tolist())
        unmatched.extend(contin_df[~contin_df['ID'].isin(dataset_df['ID'])]['ID'].tolist())
        dataset_df = pd.merge(dataset_df, contin_df, on="ID", validate='1:1')

    dataset_df.drop(columns=['ID'], inplace=True)

    return dataset_df, inputs_map, unmatched

rename_variables(df)

Rename variables in dataframe to be indexed version of x

Parameters:

Name Type Description Default
df DataFrame

dataframe to alter

required

Returns:

Name Type Description
vmap dict

new names are keys and original names are values

Source code in src/athenage/utilities/data_processing.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def rename_variables(df: pd.DataFrame) -> dict:
    """ Rename variables in dataframe to be indexed version of x

    Args:
        df: dataframe to alter

    Returns:
        vmap: new names are keys and original names are values

    """
    newcols = {}
    vmap = {}
    oldcols = list(df.drop('y', axis=1).columns)
    for i in range(len(oldcols)):
        newvar = 'x' + str(i)# + ']'
        newcols[oldcols[i]]=newvar
        vmap['x['+str(i) + ']']=oldcols[i]

    df.rename(newcols, inplace=True, axis=1)

    return vmap

reset_variable_names(model, vmap)

Replace x variables with names in variable map

Parameters:

Name Type Description Default
model str

evolved model containing variables with indexed x values ('x[0],x[1],...)

required
vmap dict

dict with key as x variable and value as name to replace with

required

Returns:

Name Type Description
string str

model string with variable names updated

Source code in src/athenage/utilities/data_processing.py
311
312
313
314
315
316
317
318
319
320
321
def reset_variable_names(model: str, vmap: dict) -> str:
    """Replace x variables with names in variable map

    Args:
        model: evolved model containing variables with indexed x values ('x[0],x[1],...)
        vmap: dict with key as x variable and value as name to replace with

    Returns: 
        string: model string with variable names updated
    """
    return re.sub(r"((x\[\d+\]))", lambda g: vmap[g.group(1)], model)

write_plots(basefn, best_models, var_map, inputs_map, color_map)

Produces png file displaying best models with one per cross-validation.

Parameters:

Name Type Description Default
basefn str

name of file to write

required
best_models list[Individual]

deap Individual objects from run

required
var_map dict

key is value (x[0],x[1],etc) and value is name from dataset adjusted for multiple occurences (Ott encoding)

required
inputs_map dict

key is name (adjusted for Ott encoding), value is original column name in input dataset

required
color_map ColorMapping

contains colors to use in plot

required

Returns:

Type Description
None

None

Source code in src/athenage/utilities/data_processing.py
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
def write_plots(basefn: str, best_models: list['deap.Creator.Individual'], var_map: dict, 
                inputs_map: dict, color_map: ColorMapping) -> None:
    """Produces png file displaying best models with one per cross-validation.

    Parameters:
        basefn: name of file to write
        best_models: deap Individual objects from run
        var_map: key is value (x[0],x[1],etc) and value is name from dataset adjusted for multiple occurences (Ott encoding)
        inputs_map: key is name (adjusted for Ott encoding), value is original column name in input dataset
        color_map: contains colors to use in plot


    Returns: 
        None
    """

    inputs_map.update({'PA':'PA', 'PM':'PM', 'PS':'PS','PD':'PD','PAND':'PAND','PNAND':'PNAND',
                       'POR':'POR','PNOR':'PNOR','PXOR':'PXOR'})
    for cv,model in enumerate(best_models,1):
        compressed = compress_weights(model.phenotype)
        modelstr = reset_variable_names(compressed, var_map)
        nodes = construct_nodes(modelstr)
        finalindex = len(nodes)-1
        node_labels={}
        edge_labels={}
        node_colors={}
        categories = set()
        node_size=8

        for node in nodes:
            node.num = abs(node.num - finalindex)
            node_labels[node.num] = node.label
            # possible colors: https://matplotlib.org/stable/users/explain/colors/colors.html
            if node.label in inputs_map:
                node_colors[node.num] = color_map.get_input_color(inputs_map[node.label])
            else:
                node_colors[node.num] = color_map.get_input_color(node.label)

            if node_colors[node.num] != color_map.default_color and \
                node_colors[node.num]  != color_map.operator_color:
                categories.add(color_map.inputs[inputs_map[node.label]].category)

            if node.to is not None:
                node.to = abs(node.to - finalindex)
                if node.weight:
                    edge_labels[(node.num,node.to)]="{weight:.2f}".format(weight=float(node.weight))

        edges = []
        for node in nodes:
            if node.to is not None:
                edges.append((node.num,node.to))
        plt.clf()
        fig, ax = plt.subplots()
        Graph(edges, node_layout='dot', arrows=True, node_labels = node_labels, 
            edge_labels=edge_labels, node_color=node_colors, node_size=node_size, ax=ax,
            scale=(2,2), edge_label_fontdict=dict(size=6), node_label_fontdict=dict(size=4))
            # node_label_offset=(0,0.1))


        if len(categories) > 0:
            # add legend
            node_proxy_artists = []
            for cat in categories:
                proxy =  plt.Line2D(
                    [], [],
                    linestyle='None',
                color=color_map.get_category_color(cat),
                marker='s',
                markersize=node_size,#//1.25,
                label=cat
                )
                node_proxy_artists.append(proxy)

            node_legend = ax.legend(handles=node_proxy_artists, loc='lower left', fontsize=7)#, title='Categories')
            ax.add_artist(node_legend)

        outputfn = basefn + ".cv" + str(cv) + ".png"
        plt.title("\n".join(textwrap.wrap(modelstr, 80)), fontsize=8)
        plt.savefig(outputfn, dpi=300)

write_summary(filename, best_models, score_type, var_map, fitness_test, nmissing)

Produce summary file reporting results

Parameters:

Name Type Description Default
filename str

name of file to write

required
best_models list[Individual]

deap Individual objects from run

required
score_type str

test used for scoring individuals

required
var_map dict

key is value (x[0],x[1],etc) and value is original column name in dataset

required
fitness_test list[float]

contains testing fitness scores for each individual

required
nmissing list[int]

number of missing rows for individual

required

Returns:

Type Description
None

None

Source code in src/athenage/utilities/data_processing.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
def write_summary(filename: str, best_models: list['deap.creator.Individual'], score_type: str, var_map: dict, 
                  fitness_test: list[float],nmissing: list[int]) -> None:
    """Produce summary file reporting results

    Args:
        filename: name of file to write
        best_models: deap Individual objects from run
        score_type: test used for scoring individuals
        var_map: key is value (x[0],x[1],etc) and value is original column name in dataset
        fitness_test: contains testing fitness scores for each individual
        nmissing: number of missing rows for individual


    Returns: 
        None
    """

    header = f"CV\tVariables\t{score_type} Training\tTesting\tTraining-missing\tTesting-missing\n"

    fh = open(filename, "w")
    fh.write(header)

    pattern = re.compile(r"(x\[\d+\])")


    for i,model in enumerate(best_models):
        fh.write(f"{i+1}\t")
        # extract variables from model
        for match in pattern.finditer(model.phenotype):
            fh.write(f"{var_map[match.group(1)]} ")

        fh.write(f"\t{model.fitness.values[0]}")
        fh.write(f"\t{fitness_test[i]}")
        fh.write(f"\t{nmissing[i][0] * 100:.2f}%")
        fh.write(f"\t{nmissing[i][1] * 100:.2f}%")
        fh.write("\n")


    fh.write("\nCV\tModel\n")
    for i,model in enumerate(best_models):
        compressed = compress_weights(model.phenotype)
        compressed = reset_variable_names(compressed, var_map)
        fh.write(f"{i+1}\t{compressed}\n")

    fh.write("\n***** Original Networks *****")
    fh.write("\nCV\tModel\n")
    for i,model in enumerate(best_models):
        fh.write(f"{i+1}\t{model.phenotype}\n")

    fh.close()