-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontact_map_processing.py
More file actions
114 lines (80 loc) · 4.27 KB
/
contact_map_processing.py
File metadata and controls
114 lines (80 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
__doc__ = """Processing contact map and something for DMoN if needed"""
import numpy as np
import pandas as pd
import scipy.sparse as sparse
import os
import re
from tqdm import tqdm
from scipy.sparse import base
from collections import defaultdict
from sklearn.preprocessing import LabelEncoder
import subprocess
import logging
logger = logging.getLogger(__name__)
class ContactMap:
DELIMITER = {"tsv": "\t", "csv": ","}
SCALING_FUNC = {"log": np.log, "sqrt": np.sqrt, None: lambda x: x}
def __init__(self, path, feature_columns=None, node_1_column="FirstName",
node_2_column="SecondName",
score_column="SpadesScore",
scaling_method="log"):
self.path = path
self.node_1 = node_1_column
self.node_2 = node_2_column
# self.len1, self.len2 = ("FirstLength", "SecondLength")
self.feature_columns = feature_columns or []
self.score_column = score_column
self.scaling_func = scaling_method
self.contigs_with_hic = None
self.SHAPE = None
self.nuinque_hic_contigs = None
self.encoder = LabelEncoder()
self.data = pd.read_csv(path, delimiter=ContactMap.DELIMITER[path.split(".")[-1]])
logger.info(f"Scaling {self.score_column} by {self.scaling_func} scaling function")
self.data[self.score_column] = self.data[self.score_column].apply(ContactMap.SCALING_FUNC[self.scaling_func])
# self.contig2length = {}
# for _, row in tqdm(self.data.iterrows(), total=self.data.shape[0], desc="Mapping lengths to contigs"):
# contig_1, contig_2, length_1, length_2 = row[[self.node_1, self.node_2, self.len1, self.len2]]
#
# for c, len_ in zip((contig_1, contig_2), (length_1, length_2)):
# if c not in self.contig2length:
# self.contig2length[c] = len_
self.fit_encoder(node_1_column, node_2_column)
def fit_encoder(self, node_1, node_2):
logger.info("Fitting encoder...")
unique_features = set(self.data[node_1].unique()) | set(self.data[node_2].unique())
# fitting encoder:
self.encoder.fit(list(unique_features))
unique_features_amount = len(unique_features)
self.nuinque_hic_contigs = unique_features_amount
self.contigs_with_hic = unique_features
self.SHAPE = (unique_features_amount, unique_features_amount)
logger.info(f"Fitting complete! Found {self.nuinque_hic_contigs} contigs with Hi-C links")
def get_sparse_adjacency_feature_matrices(self, threshold=0):
"""
:param threshold: value SpadesScore must be greater or equal to for
:return: sparse adjacency and sparse feature matrux for DMoN
"""
h = len(set(self.data[self.node_1].unique()) | set(self.data[self.node_2].unique()))
shape = (h, h)
sparse_adjacency = sparse.lil_matrix(np.zeros(shape)) # !!!!!!!!!!!!!
sparse_feature_matrix = sparse.lil_matrix(
np.zeros((self.nuinque_hic_contigs, max(len(self.feature_columns) // 2, 1))))
self.data[self.node_1] = self.encoder.transform(self.data[self.node_1].values)
self.data[self.node_2] = self.encoder.transform(self.data[self.node_2].values)
visited_pairs = set()
for _, row in tqdm(self.data.iterrows(), total=self.data.shape[0]):
node_1_id = row[self.node_1]
node_2_id = row[self.node_2]
if visited_pairs & {(node_1_id, node_2_id), (node_2_id, node_1_id)}:
continue
visited_pairs |= {(node_1_id, node_2_id), (node_2_id, node_1_id)}
sparse_adjacency[node_1_id, node_2_id] = sparse_adjacency[node_2_id, node_1_id] = int(
row[self.score_column] >= threshold)
for feature_index, feature in enumerate(self.feature_columns[::2]): # First node
sparse_feature_matrix[node_1_id, feature_index] = row[feature]
for feature_index, feature in enumerate(self.feature_columns[1::2]): # Second node
sparse_feature_matrix[node_2_id, feature_index] = row[feature]
sparse_adjacency = sparse.csr_matrix(sparse_adjacency)
sparse_feature_matrix = sparse.csr_matrix(sparse_feature_matrix)
return sparse_adjacency, sparse_feature_matrix