Skip to content

Data processing

Provides methods for the ATHENA project for procssing input files, transforming data, writing reports and plots.

Reads data files and transforms data

compress_weights(model_str)

Compresses weights/constants to simplify the model string

Source code in src/athenage/utilities/data_processing.py
539
540
541
542
543
544
def compress_weights(model_str):
    """ Compresses weights/constants to simplify the model string"""
    if re.search(r"[PA|PD|PM|PS|PAND|PNAND|POR|PXOR|PNOR]", model_str):
        return compress_weights_nn(model_str)
    else:
        return compress_weights_sr(model_str)

construct_nodes(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE network Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
668
669
670
671
672
673
674
675
676
677
678
679
680
def construct_nodes(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE network
     Returns
       nodes constructed from the model
    """ 
    if re.search(r"PA\(|PS\(|PD\(|PM\(|PAND\(|PNAND\(|POR\(|PXOR\(|PNOR\(]", modelstr):
        return construct_nodes_nn(modelstr)
    else:
        return construct_nodes_sr(modelstr)

construct_nodes_nn(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE neural network model Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
def construct_nodes_nn(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE neural network model
     Returns
       nodes constructed from the model
    """ 

    model = modelstr.replace('([', ' [').replace('])', '] ').replace('(', ' ( ').replace(')', ' ) ')
    ignore = {','}
    elements = model.split()

    stack = deque()

    stack.append(elements[0])
    i = 1
    nodes = []

    # use stack to construct the nodes/edges
    while stack:
        if elements[i] in ignore:
            i+=1
        elif elements[i] == ')':
            enditem = elements[i]
            item = stack.pop()
            popitems = list()
            # pop and keep all the items that are not the matching enditem
            while item != '(':
                popitems.append(item)
                item = stack.pop()

            # this will now be 3 elements with the first being a node, second * and third the weight            
            if isinstance(popitems[0], Node):
                popitems[0].weight = popitems[2]
                node = popitems[0]
            else:
                node = Node(weight = popitems[2], num=len(nodes), label=popitems[0])
                nodes.append(node)

            # push the node back on to the stack
            stack.append(node)
            i += 1
        elif elements[i] == ']':
            # should only be nodes on stack until '['
            item=stack.pop()
            function_nodes = list()
            while item != '[':
                function_nodes.append(item.num)
                item=stack.pop()

            # element after will be a node
            item = stack.pop()
            if not isinstance(item, Node):
                node = Node(num=len(nodes), label=item)
                nodes.append(node)
            else:
                node = item

            for n in function_nodes:
                nodes[n].to = node.num
            # when empty all nodes have been processed
            if not stack:
                break
            else:
                stack.append(node)
                i += 1
        else:
            stack.append(elements[i])
            i+=1

    return nodes

construct_nodes_sr(modelstr)

Returns node objects representing the network

Parameters: modelstr: String containing GE symbolic regression model Returns nodes constructed from the model

Source code in src/athenage/utilities/data_processing.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
def construct_nodes_sr(modelstr:str) -> list:
    """
    Returns node objects representing the network

     Parameters:
        modelstr: String containing GE symbolic regression model
     Returns
       nodes constructed from the model
    """ 

    model = modelstr.replace('activate(','')
    model = model[:-1]
    postfix_stack = infix_to_postfix(model)

    operators = {"+":2,"-":2,"*":2,"pdiv":2}
    stack = deque()
    nodes=[]

    for i in range(len(postfix_stack)):
        if postfix_stack[i] not in operators:
            nodes.append(Node(label=postfix_stack[i], num=len(nodes)))
            stack.append(nodes[-1])
        else:
            m = operators[postfix_stack[i]]
            for j in range(m):
                n = stack.pop()
                n.to = len(nodes)
            nodes.append(Node(label=postfix_stack[i], num=len(nodes)))
            stack.append(nodes[-1])
    return nodes

format_number(num, max_decimals=2)

Formats a number as a string with a maximum number of decimals, only if needed.

Source code in src/athenage/utilities/data_processing.py
535
536
537
def format_number(num, max_decimals=2):
    """Formats a number as a string with a maximum number of decimals, only if needed."""
    return f"{num:.{max_decimals}f}".rstrip('0').rstrip('.')

generate_splits(ncvs, fitness_type, df, have_test_file=False, test_df=None, rand_seed=1234)

Generate splits for training and testing based on number of cross-validation intervals requested.

Parameters:

Name Type Description Default
ncvs int

number of splits (cross-validations)

required
fitness_type str

for 'r-squared' split into specified number of folds, otherwise split balancing classes in data

required
df DataFrame

dataset to use for splitting

required
have_test_file bool

when true use the test_df as the tesing set

False
test_df DataFrame

when using a test_file contains the testing dataset

None
rand_seed int

controls split

1234

Returns:

Name Type Description
train_splits ndarray

2-D array of indexes to use in traininig

test_splits ndarray

2-D array of indexes to use in testing

df DataFrame

dataset to use with these indexes, concatenated for training and testing when test dataset provided

Source code in src/athenage/utilities/data_processing.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def generate_splits(ncvs: int, fitness_type: str, df: pd.DataFrame, have_test_file: bool=False, test_df: pd.DataFrame=None, 
                    rand_seed: int=1234) -> tuple[np.ndarray,np.ndarray,pd.DataFrame]:
    """Generate splits for training and testing based on number of cross-validation intervals
        requested.

    Args:
        ncvs: number of splits (cross-validations)
        fitness_type: for 'r-squared' split into specified number of folds, otherwise split balancing classes in data
        df: dataset to use for splitting
        have_test_file: when true use the test_df as the tesing set
        test_df: when using a test_file contains the testing dataset
        rand_seed: controls split


    Returns: 
        train_splits: 2-D array of indexes to use in traininig
        test_splits: 2-D array of indexes to use in testing
        df: dataset to use with these indexes, concatenated for training and testing when test dataset provided
    """
    if ncvs > 1:
        if fitness_type== 'r-squared':
            (train_splits, test_splits) = split_kfolds(df, ncvs, 
                rand_seed)
        else:
            (train_splits, test_splits) = split_statkfolds(df, ncvs, 
                rand_seed)
    else:
        train_splits = np.zeros((1,df.shape[0]))
        train_splits[0] = np.array([i for i in range(df.shape[0])])
        if not have_test_file:
            test_splits = np.zeros((1,0))
        else:
            test_splits = np.zeros((1, test_df.shape[0]))
            test_splits[0] = np.array([i for i in range(df.shape[0], test_df.shape[0] + df.shape[0])])
            df = pd.concat([df, test_df], axis=0)

    return train_splits, test_splits, df

prepare_split_data(df, train_indexes, test_indexes)

Create and return data arrays for training and testing using indexes passed.

Parameters:

Name Type Description Default
df DataFrame

data set to split

required
train_indexes ndarray

rows in dataset to make training set

required
test_indexes ndarray

rows in dataset to make test set

required

Returns:

Name Type Description
X_train ndarray

x values in training

Y_train ndarray

y values in training

X_test ndarray

x values for testing

Y_test ndarray

y values for testing

Source code in src/athenage/utilities/data_processing.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def prepare_split_data(df: pd.DataFrame, train_indexes: np.ndarray, 
                       test_indexes: np.ndarray) -> tuple[np.ndarray,np.ndarray,np.ndarray,np.ndarray]:
    """Create and return data arrays for training and testing using indexes passed.

    Args:
        df: data set to split
        train_indexes: rows in dataset to make training set
        test_indexes: rows in dataset to make test set

    Returns: 
        X_train: x values in training
        Y_train: y values in training
        X_test: x values for testing
        Y_test: y values for testing
    """

    traindf = df.iloc[train_indexes]
    testdf  = df.iloc[test_indexes]

    # Assume 'y' is the label column, everything else is features
    X_train = traindf.drop(columns='y').to_numpy(dtype=np.float32).T   # transpose directly
    Y_train = traindf['y'].to_numpy(dtype=np.float32)

    X_test  = testdf.drop(columns='y').to_numpy(dtype=np.float32).T
    Y_test  = testdf['y'].to_numpy(dtype=np.float32)

    return X_train, Y_train, X_test, Y_test

process_continfile(fn, scale, missing=None, included_vars=None, max_missing_fraction=None)

Read in continuous data and construct dataframe from values

Parameters:

Name Type Description Default
fn str

Phenotypes (outcomes) filename

required
scale bool

normalize values if true

required
missing str

identifies any missing data in file

None
included_vars list[str]

restrict set to only variables (column names) in list

None
max_missing_fraction float | None

drop numeric columns where > this fraction are missing (0–1). If None, keep all columns (default).

None

Returns:

Type Description
DataFrame

pandas dataframe

Source code in src/athenage/utilities/data_processing.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def process_continfile(fn: str, scale: bool, missing: str=None, included_vars: list[str]=None, max_missing_fraction: float | None = None) -> pd.DataFrame:
    """Read in continuous data and construct dataframe from values

    Args:
        fn: Phenotypes (outcomes) filename
        scale: normalize values if true
        missing: identifies any missing data in file
        included_vars: restrict set to only variables (column names) in list
        max_missing_fraction: drop numeric columns where > this fraction are missing (0–1).
                              If None, keep all columns (default).

    Returns: 
        pandas dataframe 
    """

    with open(fn) as f:
        header_line = f.readline().strip()
        header = header_line.split()
        first_data_line = f.readline().strip()
        first_data = first_data_line.split()

    if len(first_data) != len(header):
        raise ValueError(
            f"Column mismatch in {fn}: header has {len(header)} columns but first data row has {len(first_data)}.\n"
            f"Header (first 10 cols): {header[:10]}\n"
            f"First row (first 10 cols): {first_data[:10]}"
        )

    with open(fn) as f:
        header = f.readline().strip().split()

    # --- Stream + replace missing into a temp file to reduce memory use on large files ---
    with tempfile.NamedTemporaryFile(mode="w+", delete=False) as tmp:
        tmp.write(" ".join(header) + "\n")  # write header first
        for line in open(fn):
            # skip header (already written)
            if line.startswith(header[0]):
                continue
            if missing is not None:
                if isinstance(missing, (list, tuple, set)):
                    for m in missing:
                        line = line.replace(m, "nan")
                else:
                    line = line.replace(missing, "nan")
            tmp.write(line)
        tmp_name = tmp.name

    try:
        # --- Load columns ---
        ids = np.loadtxt(tmp_name, dtype=str, skiprows=1, usecols=(0,))
        data = np.loadtxt(tmp_name, dtype=np.float32, skiprows=1, usecols=range(1, len(header)))
    finally:
        #  Clean up temp file
        os.remove(tmp_name)

    # --- Convert to DataFrame ---
    df = pd.DataFrame(data, columns=header[1:])
    df.insert(0, header[0], ids)

    # --- Restrict columns if needed ---
    if included_vars:
        keep_cols = [header[0]] + [c for c in included_vars if c in df.columns]
        df = df[keep_cols]

    # --- Drop columns with too many missing values ---
    if max_missing_fraction is not None:
        num = df.iloc[:, 1:].to_numpy(copy=False)
        missing_frac = np.mean(np.isnan(num), axis=0)
        keep_mask = missing_frac <= max_missing_fraction
        cols_to_keep = [df.columns[0]] + df.columns[1:][keep_mask].tolist()
        dropped_cols = df.columns[1:][~keep_mask].tolist()
        if len(cols_to_keep) == 1:
            raise ValueError(
                f"All non-ID columns were dropped due to missing fraction > {max_missing_fraction}. "
                f"Dropped columns: {dropped_cols}"
            )
        df = df[cols_to_keep]
        print(f"Dropped columns (>{max_missing_fraction:.2f} missing): {dropped_cols}")

    # --- Min–max normalize ---
    if scale:
        num = df.iloc[:, 1:].to_numpy(copy=False)
        col_min = np.nanmin(num, axis=0)
        col_max = np.nanmax(num, axis=0)
        diff = col_max - col_min
        diff[diff == 0] = 1
        df.iloc[:, 1:] = (num - col_min) / diff

    return df

process_genofile(fn, encoding, missing=None, included_vars=None, max_missing_fraction=None)

Read in genotype data and construct dataframe from values

Parameters:

Name Type Description Default
fn str

Phenotypes (outcomes) filename

required
encoding str

Genotype encoding type

required
missing str

identifies missing data in file

None
included_vars list[str]

restrict set to only variables in list

None
max_missing_fraction float | None

drop numeric columns where > this fraction are missing (0–1). If None, keep all columns (default).

None

Returns:

Name Type Description
data DataFrame

pandas dataframe

geno_map dict

dictionary with new label as key, original label as value

Source code in src/athenage/utilities/data_processing.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def process_genofile(fn: str, encoding: str, missing: str=None, included_vars: list[str]=None, max_missing_fraction: float | None = None) ->  tuple[pd.DataFrame, dict]:
    """Read in genotype data and construct dataframe from values

    Args:
        fn: Phenotypes (outcomes) filename
        encoding: Genotype encoding type
        missing: identifies missing data in file
        included_vars: restrict set to only variables in list
        max_missing_fraction: drop numeric columns where > this fraction are missing (0–1).
                              If None, keep all columns (default).

    Returns: 
        data: pandas dataframe
        geno_map: dictionary with new label as key, original label as value
    """
    data = pd.read_table(fn, delim_whitespace=True, header=0, keep_default_na=False)

    if included_vars:
        data=data.loc[:, data.columns.isin(included_vars)]

    if missing:
        # data.replace([missing], np.nan, inplace=True)
        data.loc[:,data.columns!='ID'] = data.loc[:,data.columns!='ID'].replace(missing, np.nan)

    # --- Drop columns with too many missing values ---
    if max_missing_fraction is not None:
        num = df.iloc[:, 1:].to_numpy(copy=False)
        missing_frac = np.mean(np.isnan(num), axis=0)
        keep_mask = missing_frac <= max_missing_fraction
        cols_to_keep = [df.columns[0]] + df.columns[1:][keep_mask].tolist()
        dropped_cols = df.columns[1:][~keep_mask].tolist()
        if len(cols_to_keep) == 1:
            raise ValueError(
                f"All non-ID columns were dropped due to missing fraction > {max_missing_fraction}. "
                f"Dropped columns: {dropped_cols}"
            )
        df = df[cols_to_keep]
        print(f"Dropped columns (>{max_missing_fraction:.2f} missing): {dropped_cols}")

    labels = list(data.columns)
    geno_map={}

    if encoding == 'additive':
        data = data.astype(str).apply(additive_encoding)
        geno_map = {labels[i]:labels[i] for i in range(0,len(labels))}

    if encoding == 'add_quad':
        orig_columns = data.loc[:,data.columns!='ID'].columns
        new_df = data[data.loc[:,data.columns!='ID'].columns.repeat(2)]

        columns = list(new_df.columns)
        columns[::2]= [ x + "_a" for x in new_df.columns[::2]]
        columns[1::2]= [ x + "_b" for x in new_df.columns[1::2]]

        # map back to original columns 
        geno_map = {columns[i]:orig_columns[(i)//2] for i in range(0,len(columns))}

        new_df.columns = columns
        add_quad_encoding(new_df)
        # add back ID column
        new_df.insert(0,"ID",data['ID'])
        data = new_df

    return data, geno_map

process_grammar_file(grammarfn, data)

Reads grammar file into string and adds all x variables present in dataframe

Parameters:

Name Type Description Default
grammarfn str

grammar filename to read and modify

required
data DataFrame

dataset to be used with the grammar

required

Returns:

Name Type Description
updated_grammar str

grammar text modified for number of variables in data

Source code in src/athenage/utilities/data_processing.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def process_grammar_file(grammarfn: str, data: pd.DataFrame) -> str:
    """Reads grammar file into string and adds all x variables present in dataframe

    Args:
        grammarfn: grammar filename to read and modify
        data: dataset to be used with the grammar

    Returns: 
        updated_grammar: grammar text modified for number of variables in data
    """
    with open(grammarfn, "r") as text_file:
        grammarstr = text_file.read()

    nvars = len([xcol for xcol in data.columns if 'x' in xcol])
    updated_grammar=""
    for i,line in enumerate(grammarstr.splitlines()):
        if re.search(r"^\s*<v>",line):
             line = "<v> ::= " + ' | '.join([f"x[{i}]" for i in range(nvars)])
        updated_grammar += line + "\n"
    return updated_grammar

process_var_colormap(colorfn=None, node_color='lightgray', var_default='white')

Create color map for graphical output of networks. Files format is tab-delimited in order of category, color, inputs

Parameters:

Name Type Description Default
colorfn str

name of file to process, when no fn provided only the network nodes (PA,PD,PM,PS,PAND,PNAND,POR,PXOR,PNOR) are included

None
node_color str

color for the operator nodes

'lightgray'
var_default str

Default colors for unspecified variables

'white'

Returns:

Name Type Description
color_map ColorMapping

node name as key and color as value

Source code in src/athenage/utilities/data_processing.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def process_var_colormap(colorfn: str=None, node_color: str='lightgray', 
    var_default: str='white') -> ColorMapping:
    """Create color map for graphical output of networks. Files format is tab-delimited
        in order of category, color, inputs

    Args:
        colorfn: name of file to process, when no fn provided only the network nodes
            (PA,PD,PM,PS,PAND,PNAND,POR,PXOR,PNOR) are included
        node_color: color for the operator nodes
        var_default: Default colors for unspecified variables

    Returns: 
        color_map: node name as key and color as value
    """
    color_map = ColorMapping(default_color=var_default, operator_color=node_color)

    color_map.add_category('netnodes', color_map.operator_color)
    color_map.add_nodes({'PA':color_map.operator_color,'PD':color_map.operator_color,
    'PM':color_map.operator_color,'PS':color_map.operator_color,'PAND':color_map.operator_color,
    'PNAND':color_map.operator_color, 'POR':color_map.operator_color, 'PNOR':color_map.operator_color,
    'PXOR':color_map.operator_color},'netnodes')

    # header for file is category,color,inputs
    if colorfn:
        with open(colorfn) as csv_file:
            #skip header
            heading = next(csv_file)
            reader = csv.reader(csv_file, delimiter='\t')
            # reader = csv.reader(csv_file, delim_whitespace=True)

            for row in reader:
                if not row:
                    continue
                # set category color
                color_map.add_category(row[0],row[1])
                for in_var in row[2:]:
                    for var in in_var.split():
                        color_map.add_input(var,row[0],row[1])

    return color_map

protected_variable_names(model, vmap)

Replace x variables with names in variable map ('-' removed for plotting)

Parameters:

Name Type Description Default
model str

evolved model containing variables with indexed x values ('x[0],x[1],...)

required
vmap dict

dict with key as x variable and value as name to replace with

required

Returns:

Name Type Description
string str

model string with variable names updated

Source code in src/athenage/utilities/data_processing.py
435
436
437
438
439
440
441
442
443
444
445
def protected_variable_names(model: str, vmap: dict) -> str:
    """Replace x variables with names in variable map ('-' removed for plotting)

    Args:
        model: evolved model containing variables with indexed x values ('x[0],x[1],...)
        vmap: dict with key as x variable and value as name to replace with

    Returns:
        string: model string with variable names updated
    """
    return re.sub(r"((x\[\d+\]))", lambda g: vmap[g.group(1)], model)

read_input_files(outcomefn, genofn, continfn, out_scale=False, contin_scale=False, geno_encode=None, missing=None, outcome=None, missing_fract=1.0, included_vars=None)

Read in data and construct pandas dataframe

Parameters:

Name Type Description Default
outcomefn str

Phenotypes (outcomes) filename

required
genofn str

SNP values filename

required
continfn str

any continuous data filename

required
out_scale bool

scale outcome values from 0 to 1.0

False
contin_scale bool

scale each continuous variable from 0 to 1.0

False
geno_encode str

encode genotype data. options are 'add_quad' and 'additive'

None
outcome str

column header in continfn to use for 'y'

None
missing_fract float

columns with >= fraction of missing will be dropped

1.0
included_vars list[str]

list of variable names to include in analysis; all others excluded

None

Returns:

Name Type Description
dataset_df DataFrame

pandas dataframe

inputs_map dict

dictionary with new label as key, original label as value

unmatched list

list of IDs that are not in all input files

Source code in src/athenage/utilities/data_processing.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def read_input_files(outcomefn: str, genofn: str, continfn: str, out_scale: bool=False,
    contin_scale: bool=False, geno_encode: str=None, missing: str=None, outcome: str=None,
    missing_fract: float=1.0,included_vars: list[str]=None) -> tuple[pd.DataFrame, dict, list]:
    """Read in data and construct pandas dataframe

    Args:
        outcomefn: Phenotypes (outcomes) filename
        genofn: SNP values filename
        continfn: any continuous data filename
        out_scale: scale outcome values from 0 to 1.0
        contin_scale: scale each continuous variable from 0 to 1.0
        geno_encode: encode genotype data. options are 'add_quad' and 'additive'
        outcome: column header in continfn to use for 'y'
        missing_fract: columns with >= fraction of missing will be dropped
        included_vars: list of variable names to include in analysis; all others excluded

    Returns:
        dataset_df: pandas dataframe
        inputs_map: dictionary with new label as key, original label as value
        unmatched: list of IDs that are not in all input files
    """

    y_df = process_continfile(outcomefn, out_scale)

    if outcome is None:
        dataset_df = y_df[['ID',y_df.columns[1]]]
    else:
        dataset_df = y_df[['ID',outcome]]

    dataset_df.columns = ['ID', 'y']

    if included_vars:
        included_vars.insert(0, 'ID')

    contin_df = None
    inputs_map = {}
    if continfn:
        contin_df = process_continfile(continfn, contin_scale, missing, included_vars, missing_fract)
        inputs_map={contin_df.columns[i]:contin_df.columns[i] for i in range(0,len(contin_df.columns))}

    if genofn:
        geno_df, geno_map = process_genofile(genofn, geno_encode, missing, included_vars)
        inputs_map.update(geno_map)

    dataset_df = dataset_df.sort_values('ID', ascending=False)
    unmatched = []

    # merge genetic SNP info into dataset dataframe
    if genofn:
        dataset_df['ID'] = dataset_df['ID'].astype(str)
        geno_df['ID']  = geno_df['ID'].astype(str)
        unmatched_ids1 = dataset_df.loc[~dataset_df['ID'].isin(geno_df['ID']), 'ID']
        unmatched_ids2 = geno_df.loc[~geno_df['ID'].isin(dataset_df['ID']), 'ID']
        unmatched.extend(unmatched_ids1.tolist())
        unmatched.extend(unmatched_ids2.tolist())

        dataset_ids = dataset_df['ID'].astype(str)
        geno_ids  = geno_df['ID'].astype(str)

        # Symmetric difference = IDs in one but not the other
        unmatched_ids = set(dataset_ids) ^ set(geno_ids)
        unmatched.extend(unmatched_ids)
        # Merge 
        dataset_df = dataset_df.merge(contin_df, on="ID", validate="1:1", copy=False)

        del gene_df
        gc.collect()

    # merge continuous input values into dataset frame
    if continfn:
        dataset_ids = dataset_df['ID'].astype(str)
        contin_ids  = contin_df['ID'].astype(str)

        # Symmetric difference = IDs in one but not the other
        unmatched_ids = set(dataset_ids) ^ set(contin_ids)
        unmatched.extend(unmatched_ids)
        # Merge 
        dataset_df = dataset_df.merge(contin_df, on="ID", validate="1:1", copy=False)

        del contin_df
        gc.collect()

    dataset_df.drop(columns=['ID'], inplace=True)

    return dataset_df, inputs_map, unmatched

rename_variables(df)

Rename variables in dataframe to be indexed version of x

Parameters:

Name Type Description Default
df DataFrame

dataframe to alter

required

Returns:

Name Type Description
vmap dict

new names are keys and original names are values (with - replaced by _ to work with plotting)

origvmap dict

original column names

Source code in src/athenage/utilities/data_processing.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def rename_variables(df: pd.DataFrame) -> dict:
    """ Rename variables in dataframe to be indexed version of x

    Args:
        df: dataframe to alter

    Returns:
        vmap: new names are keys and original names are values (with - replaced by _ to work with plotting)
        origvmap: original column names 

    """
    newcols = {}
    vmap = {}
    origvmap = {}
    oldcols = list(df.drop('y', axis=1).columns)
    for i in range(len(oldcols)):
        newvar = 'x' + str(i)# + ']'
        newcols[oldcols[i]]=newvar
#        vmap['x['+str(i) + ']']=oldcols[i]
        vmap['x['+str(i) + ']']=oldcols[i].replace("-","_")
        origvmap[oldcols[i].replace("-","_")]=oldcols[i]

    df.rename(newcols, inplace=True, axis=1)

    return vmap, origvmap

reset_variable_names(model, vmap, orig_vmap)

Replace x variables with names in variable map

Parameters:

Name Type Description Default
model str

evolved model containing variables with indexed x values ('x[0],x[1],...)

required
vmap dict

dict with key as x variable and value as name to replace with

required
orig_vmap dict

dict with key as protected variable name (- removed) and the original name

required

Returns:

Name Type Description
string str

model string with variable names updated

Source code in src/athenage/utilities/data_processing.py
422
423
424
425
426
427
428
429
430
431
432
433
def reset_variable_names(model: str, vmap: dict, orig_vmap: dict) -> str:
    """Replace x variables with names in variable map

    Args:
        model: evolved model containing variables with indexed x values ('x[0],x[1],...)
        vmap: dict with key as x variable and value as name to replace with
        orig_vmap: dict with key as protected variable name (- removed) and the original name

    Returns: 
        string: model string with variable names updated
    """
    return re.sub(r"((x\[\d+\]))", lambda g: orig_vmap[vmap[g.group(1)]], model)

write_plots(basefn, best_models, var_map, orig_var_map, inputs_map, color_map)

Produces png file displaying best models with one per cross-validation.

Parameters:

Name Type Description Default
basefn str

name of file to write

required
best_models list[Individual]

deap Individual objects from run

required
var_map dict

key is value (x[0],x[1],etc) and value is name from dataset adjusted for multiple occurences (Ott encoding)

required
orig_var_map dict

key is protected variable name ('-' removed) and value is original name in input

required
inputs_map dict

key is name (adjusted for Ott encoding), value is original column name in input dataset

required
color_map ColorMapping

contains colors to use in plot

required

Returns:

Type Description
None

None

Source code in src/athenage/utilities/data_processing.py
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
def write_plots(basefn: str, best_models: list['deap.Creator.Individual'], var_map: dict, orig_var_map: dict,
                inputs_map: dict, color_map: ColorMapping) -> None:
    """Produces png file displaying best models with one per cross-validation.

    Parameters:
        basefn: name of file to write
        best_models: deap Individual objects from run
        var_map: key is value (x[0],x[1],etc) and value is name from dataset adjusted for multiple occurences (Ott encoding)
        orig_var_map: key is protected variable name ('-' removed) and value is original name in input
        inputs_map: key is name (adjusted for Ott encoding), value is original column name in input dataset
        color_map: contains colors to use in plot


    Returns: 
        None
    """

    inputs_map.update({'PA':'PA', 'PM':'PM', 'PS':'PS','PD':'PD','PAND':'PAND','PNAND':'PNAND',
                       'POR':'POR','PNOR':'PNOR','PXOR':'PXOR'})
    for cv,model in enumerate(best_models,1):
        compressed = compress_weights(model.phenotype)
#        modelstr = reset_variable_names(compressed, var_map, orig_var_map)
        modelstr = protected_variable_names(compressed, var_map)
        nodes = construct_nodes(modelstr)
        finalindex = len(nodes)-1
        node_labels={}
        edge_labels={}
        node_colors={}
        categories = set()
        node_size=8

        for node in nodes:
            node.num = abs(node.num - finalindex)
            node_labels[node.num] = node.label
            if node.label in orig_var_map:
              node_labels[node.num] = orig_var_map[node.label]
            # possible colors: https://matplotlib.org/stable/users/explain/colors/colors.html
            if node.label in inputs_map:
                node_colors[node.num] = color_map.get_input_color(inputs_map[node.label])
            else:
                node_colors[node.num] = color_map.get_input_color(node.label)

            if node_colors[node.num] != color_map.default_color and \
                node_colors[node.num]  != color_map.operator_color:
                categories.add(color_map.inputs[inputs_map[node.label]].category)

            if node.to is not None:
                node.to = abs(node.to - finalindex)
                if node.weight:
                    edge_labels[(node.num,node.to)]="{weight:.2f}".format(weight=float(node.weight))

        edges = []
        for node in nodes:
            if node.to is not None:
                edges.append((node.num,node.to))
        plt.clf()
        fig, ax = plt.subplots()
        Graph(edges, node_layout='dot', arrows=True, node_labels = node_labels, 
            edge_labels=edge_labels, node_color=node_colors, node_size=node_size, ax=ax,
            scale=(2,2), edge_label_fontdict=dict(size=6), node_label_fontdict=dict(size=4))
            # node_label_offset=(0,0.1))

        if len(categories) > 0:
            # add legend
            node_proxy_artists = []
            for cat in categories:
                proxy =  plt.Line2D(
                    [], [],
                    linestyle='None',
                color=color_map.get_category_color(cat),
                marker='s',
                markersize=node_size,#//1.25,
                label=cat
                )
                node_proxy_artists.append(proxy)

            node_legend = ax.legend(handles=node_proxy_artists, loc='lower left', fontsize=7)#, title='Categories')
            ax.add_artist(node_legend)

        outputfn = basefn + ".cv" + str(cv) + ".png"
        plt.title("\n".join(textwrap.wrap(modelstr, 80)), fontsize=8)
        plt.savefig(outputfn, dpi=300)

write_summary(filename, best_models, score_type, var_map, orig_var_map, fitness_test, nmissing)

Produce summary file reporting results

Parameters:

Name Type Description Default
filename str

name of file to write

required
best_models list[Individual]

deap Individual objects from run

required
score_type str

test used for scoring individuals

required
var_map dict

key is value (x[0],x[1],etc) and value is original column name in dataset

required
orig_var_map dict

key is protect variable name ('-' removed) and value is original name in input

required
fitness_test list[float]

contains testing fitness scores for each individual

required
nmissing list[int]

number of missing rows for individual

required

Returns:

Type Description
None

None

Source code in src/athenage/utilities/data_processing.py
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def write_summary(filename: str, best_models: list['deap.creator.Individual'], score_type: str, var_map: dict, 
                  orig_var_map: dict, fitness_test: list[float],nmissing: list[int]) -> None:
    """Produce summary file reporting results

    Args:
        filename: name of file to write
        best_models: deap Individual objects from run
        score_type: test used for scoring individuals
        var_map: key is value (x[0],x[1],etc) and value is original column name in dataset
        orig_var_map: key is protect variable name ('-' removed) and value is original name in input
        fitness_test: contains testing fitness scores for each individual
        nmissing: number of missing rows for individual


    Returns: 
        None
    """

    header = f"CV\tVariables\t{score_type} Training\tTesting\tTraining-missing\tTesting-missing\n"

    fh = open(filename, "w")
    fh.write(header)

    pattern = re.compile(r"(x\[\d+\])")


    for i,model in enumerate(best_models):
        fh.write(f"{i+1}\t")
        # extract variables from model
        for match in pattern.finditer(model.phenotype):
            fh.write(f"{orig_var_map[var_map[match.group(1)]]} ")

        fh.write(f"\t{model.fitness.values[0]}")
        fh.write(f"\t{fitness_test[i]}")
        fh.write(f"\t{nmissing[i][0] * 100:.2f}%")
        fh.write(f"\t{nmissing[i][1] * 100:.2f}%")
        fh.write("\n")


    fh.write("\nCV\tModel\n")
    for i,model in enumerate(best_models):
        compressed = compress_weights(model.phenotype)
        compressed = reset_variable_names(compressed, var_map, orig_var_map)
        fh.write(f"{i+1}\t{compressed}\n")

    fh.write("\n***** Original Networks *****")
    fh.write("\nCV\tModel\n")
    for i,model in enumerate(best_models):
        fh.write(f"{i+1}\t{model.phenotype}\n")

    fh.close()