Skip to content

Commit 73adf6a

Browse files
BlackPoint-CXweiguoz
authored andcommitted
Implement cluster model by subclassing of tensorflow.keras (#16)
* WIP: Implement cluster * WIP : Implement cluster by subclassing of tensorflow.keras * Implement Deep Embedding Cluster Model. * Update testcase of deep_embedding_cluster ; Change split logic when calling train_on_batch. * Update setup.py for requirement of package scikit-learn; Rename function cluster_train_loop to sqlflow_train_loop * Update setup.py for specified version of scikit-learn. * Update requriments of package : numpy, pandas * Update setup.py : Change version of tensorflow to 2.0.0b1, same with sqlflow. * Add parameter epoch and verbose for sqlflow_train_loop to meet the form of custom model.
1 parent 4cf9413 commit 73adf6a

File tree

4 files changed

+398
-1
lines changed

4 files changed

+398
-1
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
# What packages are required for this module to be executed?
2424
REQUIRED = [
25-
'tensorflow==2.0.0-alpha0'
25+
'tensorflow==2.0.0b1', 'scikit-learn==0.20.0', 'numpy==1.16.2', 'pandas==0.25.1'
2626
]
2727
SETUP_REQUIRED = [
2828
'pytest-runner'

sqlflow_models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from ._version import __version__
22
from .dnnclassifier import DNNClassifier
33
from .lstmclassifier import StackedBiLSTMClassifier
4+
from .deep_embedding_cluster import DeepEmbeddingClusterModel
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
#!usr/bin/env python
2+
# -*- coding:utf-8 _*-
3+
4+
"""
5+
__author__ : chenxiang
6+
__email__ : alfredchenxiang@didichuxing.com
7+
__file_name__ : deep_embedding_cluster.py
8+
__create_time__ : 2019/09/03
9+
"""
10+
from datetime import datetime
11+
import tensorflow as tf
12+
from tensorflow.python import keras
13+
from tensorflow.python.data import make_one_shot_iterator
14+
from tensorflow.python.feature_column.feature_column_v2 import DenseFeatures
15+
from tensorflow.python.keras.callbacks import EarlyStopping, ReduceLROnPlateau
16+
from tensorflow.python.keras.layers import Dense, Layer
17+
from tensorflow.python.keras import backend
18+
import numpy as np
19+
from sklearn.cluster import KMeans
20+
from tensorflow.python.keras.optimizer_v2.gradient_descent import SGD
21+
import pandas as pd
22+
23+
24+
class DeepEmbeddingClusterModel(keras.Model):
25+
26+
def __init__(self,
27+
feature_columns,
28+
n_clusters=10,
29+
kmeans_init=20,
30+
run_pretrain=True,
31+
existed_pretrain_model=None,
32+
pretrain_dims=None,
33+
pretrain_activation_func='relu',
34+
pretrain_batch_size=256,
35+
train_batch_size=256,
36+
pretrain_epochs=1,
37+
pretrain_initializer='glorot_uniform',
38+
train_max_iters=1000,
39+
update_interval=100,
40+
tol=0.001,
41+
loss=None):
42+
"""
43+
Implement cluster model mostly based on DEC.
44+
:param feature_columns:
45+
:param n_clusters: Number of clusters.
46+
:param kmeans_init: Number of running K-Means to get best choice of centroids.
47+
:param run_pretrain: Run pre-train process or not.
48+
:param existed_pretrain_model: Path of existed pre-train model. Not used now.
49+
:param pretrain_dims: Dims of layers which is used for build autoencoder.
50+
:param pretrain_activation_func: Active function of autoencoder layers.
51+
:param pretrain_batch_size: Size of batch when pre-train.
52+
:param train_batch_size: Size of batch when run train.
53+
:param pretrain_epochs: Number of epochs when pre-train.
54+
:param pretrain_initializer: Initialize function for autoencoder layers.
55+
:param train_max_iters: Number of iterations when train.
56+
:param update_interval: Interval between updating target distribution.
57+
:param tol: tol.
58+
:param loss: Default 'kld' when init.
59+
"""
60+
super(DeepEmbeddingClusterModel, self).__init__(name='DECModel')
61+
62+
# Common
63+
self._feature_columns = feature_columns
64+
self._n_clusters = n_clusters
65+
self._default_loss = loss if loss else 'kld'
66+
self._train_max_iters = train_max_iters
67+
self._train_batch_size = train_batch_size
68+
self._update_interval = update_interval
69+
self._current_interval = 0
70+
self._tol = tol
71+
72+
# Pre-train
73+
self._run_pretrain = run_pretrain
74+
self._existed_pretrain_model = existed_pretrain_model
75+
self._pretrain_activation_func = pretrain_activation_func
76+
self._pretrain_batch_size = pretrain_batch_size
77+
self._pretrain_dims = pretrain_dims
78+
self._pretrain_epochs = pretrain_epochs
79+
self._pretrain_initializer = pretrain_initializer
80+
self._pretrain_optimizer = SGD(lr=1, momentum=0.9)
81+
82+
# K-Means
83+
self._kmeans_init = kmeans_init
84+
85+
# Cluster
86+
self._cluster_optimizer = SGD(lr=0.01, momentum=0.9)
87+
88+
# Build model
89+
self._n_stacks = len(self._pretrain_dims)
90+
self.input_layer = DenseFeatures(feature_columns)
91+
92+
# Layers - encoder
93+
self.encoder_layers = []
94+
for i in range(self._n_stacks - 1):
95+
self.encoder_layers.append(Dense(units=self._pretrain_dims[i + 1],
96+
activation=self._pretrain_activation_func,
97+
name='encoder_%d' % i))
98+
99+
self.encoder_layers.append(Dense(units=self._pretrain_dims[-1],
100+
kernel_initializer=self._pretrain_initializer,
101+
name='encoder_%d' % (self._n_stacks - 1)))
102+
103+
self.clustering_layer = ClusteringLayer(name='clustering', n_clusters=self._n_clusters)
104+
105+
def default_optimizer(self):
106+
return self._cluster_optimizer
107+
108+
def default_loss(self):
109+
return self._default_loss
110+
111+
@staticmethod
112+
def target_distribution(q):
113+
"""
114+
Calculate auxiliary softer target distributions by raising q to the second power and
115+
then normalizing by frequency.
116+
:param q: Original distributions.
117+
:return: Auxiliary softer target distributions
118+
"""
119+
weight = q ** 2 / q.sum(0)
120+
return (weight.T / weight.sum(1)).T
121+
122+
def pre_train(self, x):
123+
"""
124+
Used for preparing encoder part by loading ready-to-go model or training one.
125+
:param x:
126+
:return:
127+
"""
128+
print('{} Start pre_train.'.format(datetime.now()))
129+
130+
# Concatenate input feature to meet requirement of keras.Model.fit()
131+
def _concate_generate(dataset_element, label):
132+
concate_y = tf.stack([dataset_element[feature.key] for feature in self._feature_columns], axis=1)
133+
return (dataset_element, concate_y)
134+
135+
y = x.map(map_func=_concate_generate)
136+
y.prefetch(1)
137+
print('{} Finished dataset transform.'.format(datetime.now()))
138+
139+
# Layers - decoder
140+
self.decoder_layers = []
141+
for i in range(self._n_stacks - 1, 0, -1):
142+
self.decoder_layers.append(Dense(units=self._pretrain_dims[i],
143+
activation=self._pretrain_activation_func,
144+
kernel_initializer=self._pretrain_initializer,
145+
name='decoder_%d' % i))
146+
147+
self.decoder_layers.append(Dense(units=self._pretrain_dims[0],
148+
kernel_initializer=self._pretrain_initializer,
149+
name='decoder_0'))
150+
# Pretrain - autoencoder, encoder
151+
# autoencoder
152+
self._autoencoder = keras.Sequential(layers=[self.input_layer] + self.encoder_layers + self.decoder_layers,
153+
name='autoencoder')
154+
self._autoencoder.compile(optimizer=self._pretrain_optimizer, loss='mse')
155+
# encoder
156+
self._encoder = keras.Sequential(layers=[self.input_layer] + self.encoder_layers, name='encoder')
157+
self._encoder.compile(optimizer=self._pretrain_optimizer, loss='mse')
158+
159+
callbacks = [
160+
EarlyStopping(monitor='loss', patience=2, min_delta=0.001),
161+
ReduceLROnPlateau(monitor='loss', factor=0.1, patience=2)
162+
]
163+
print('{} Training auto-encoder.'.format(datetime.now()))
164+
self._autoencoder.fit_generator(generator=y, epochs=self._pretrain_epochs, callbacks=callbacks)
165+
166+
# encoded_input
167+
# type : numpy.ndarray shape : (num_of_all_records,num_of_cluster) (70000,10) if mnist
168+
print('{} Calculating encoded_input.'.format(datetime.now()))
169+
self.encoded_input = self._encoder.predict(x)
170+
171+
del self._autoencoder
172+
del self._encoder
173+
del self.decoder_layers
174+
print('{} Done pre-train.'.format(datetime.now()))
175+
176+
def call(self, inputs, training=None, mask=None):
177+
x = self.input_layer(inputs)
178+
for encoder_layer in self.encoder_layers:
179+
x = encoder_layer(x)
180+
return self.clustering_layer(x)
181+
182+
def init_centroids(self):
183+
"""
184+
Training K-means `_kmeans_init` times on the output of encoder to get best initial centroids.
185+
:return:
186+
"""
187+
self.kmeans = KMeans(n_clusters=self._n_clusters, n_init=self._kmeans_init)
188+
self.y_pred_last = self.kmeans.fit_predict(self.encoded_input)
189+
print('{} Done init centroids by k-means.'.format(datetime.now()))
190+
191+
def sqlflow_train_loop(self, x, epochs = 1, verbose = 0):
192+
""" Parameter `epochs` and `verbose` will not be used in this function. """
193+
# Preparation
194+
ite = make_one_shot_iterator(x)
195+
features, labels = ite.get_next()
196+
self.fit(x=features, y=labels)
197+
198+
# Pre-train autoencoder to prepare weights of encoder layers.
199+
self.pre_train(x)
200+
201+
# initialize centroids for clustering.
202+
self.init_centroids()
203+
204+
# Setting cluster layer.
205+
self.get_layer(name='clustering').set_weights([self.kmeans.cluster_centers_])
206+
207+
# Train
208+
print('{} Start preparing training dataset.'.format(datetime.now()))
209+
all_records = {}
210+
for (feature_dict, label) in x: # type : dict and EagerTensor
211+
for feature_name, feature_series in feature_dict.items(): # type : str and EagerTensor
212+
if feature_name in all_records:
213+
all_records[feature_name] = np.concatenate([all_records[feature_name], feature_series])
214+
else:
215+
all_records[feature_name] = feature_series
216+
217+
all_records_df = pd.DataFrame.from_dict(all_records)
218+
all_records_ndarray = all_records_df.values
219+
record_num, feature_num = all_records_df.shape
220+
print('{} Done preparing training dataset.'.format(datetime.now()))
221+
222+
index_array = np.arange(record_num)
223+
index, loss, p = 0, 0., None
224+
for ite in range(self._train_max_iters):
225+
if ite % self._update_interval == 0:
226+
q = self.predict(all_records) # numpy.ndarray shape(record_num,n_clusters)
227+
p = self.target_distribution(q) # update the auxiliary target distribution p
228+
y_pred = q.argmax(1)
229+
# delta_percentage means the percentage of changed predictions in this train stage.
230+
delta_percentage = np.sum(y_pred != self.y_pred_last).astype(np.float32) / y_pred.shape[0]
231+
print('{} Updating at iter: {} -> delta_percentage: {}.'.format(datetime.now(), ite, delta_percentage))
232+
self.y_pred_last = np.copy(y_pred)
233+
if ite > 0 and delta_percentage < self._tol:
234+
print('Early stopping since delta_table {} has reached tol {}'.format(delta_percentage, self._tol))
235+
break
236+
idx = index_array[index * self._train_batch_size: min((index + 1) * self._train_batch_size, record_num)]
237+
loss = self.train_on_batch(x=list(all_records_ndarray[idx].T), y=p[idx])
238+
if ite % 100 == 0:
239+
print('{} Training at iter:{} -> loss:{}.'.format(datetime.now(), ite, loss))
240+
index = index + 1 if (index + 1) * self._train_batch_size <= record_num else 0 # Update index
241+
242+
@staticmethod
243+
def prepare_prediction_column(prediction):
244+
""" Return the cluster label of the highest probability. """
245+
return prediction.argmax(axis=-1)
246+
247+
def display_model_info(self, verbose=0):
248+
if verbose >= 0:
249+
print('Summary : ')
250+
print(self.summary())
251+
if verbose >= 1:
252+
print('Layer\'s Info : ')
253+
for layer in self.encoder_layers:
254+
print(layer.name + ' : ')
255+
print(layer.get_weights())
256+
# Cluster
257+
print(self.clustering_layer.name + ' : ')
258+
print(self.clustering_layer.get_weights())
259+
260+
261+
class ClusteringLayer(Layer):
262+
def __init__(self, n_clusters, alpha=1.0, **kwargs):
263+
"""
264+
Using clustering layer to refine the cluster centroids by learning from current high confidence assignment
265+
using auxiliary target distribution.
266+
267+
:param n_clusters: Number of clusters.
268+
:param weights: Initial cluster centroids.
269+
:param alpha: Degrees of freedom parameters in Student's t-distribution. Default to 1.0 for all experiments.
270+
:param kwargs:
271+
"""
272+
self.n_clusters = n_clusters
273+
self.alpha = alpha
274+
super(ClusteringLayer, self).__init__(**kwargs)
275+
276+
def build(self, input_shape):
277+
input_dim = input_shape[1]
278+
shape = tf.TensorShape(dims=(self.n_clusters, input_dim))
279+
self.kernel = self.add_weight(name='kernel', shape=shape, initializer='glorot_uniform', trainable=True)
280+
super(ClusteringLayer, self).build(shape)
281+
282+
def call(self, inputs, **kwargs):
283+
q = 1.0 / (1.0 + (backend.sum(backend.square(backend.expand_dims(inputs, axis=1) - self.kernel),
284+
axis=2) / self.alpha))
285+
q **= (self.alpha + 1.0) / 2.0
286+
q = backend.transpose(backend.transpose(q) / backend.sum(q, axis=1))
287+
return q
288+
289+
def compute_output_shape(self, input_shape):
290+
assert input_shape and len(input_shape) == 2
291+
return input_shape[0], self.n_clusters
292+
293+
def get_config(self):
294+
config = {'n_clusters': self.n_clusters}
295+
base_config = super(ClusteringLayer, self).get_config()
296+
return dict(list(base_config.items()) + list(config.items()))

0 commit comments

Comments
 (0)