代码之家  ›  专栏  ›  技术社区  ›  duhaime

Keras:向LSTM网络添加MDN层

  •  1
  • duhaime  · 技术社区  · 6 年前

    简言之,我的问题是:在给定舞蹈序列训练数据的情况下,下面详细介绍的长-短期记忆网络是否被适当地设计来生成新的舞蹈序列?

    背景:我和一个舞者一起工作,他希望用神经网络来产生新的舞蹈序列。她寄给我 2016 chor-rnn paper 最后使用混合密度网络层的LSTM网络完成了这项任务。然而,在我的LSTM网络中添加了MDN层之后,我的损失为负,结果看起来很混乱。这可能是因为训练数据非常小,但我想在扩大训练数据规模之前验证模型的基本原理。如果有人能告诉我下面的模型是否忽略了一些基本的东西(这很有可能),我将非常感谢他们的反馈。

    X 下图)有形状(626,55,3),对应于55个身体位置的626个时间快照,每个快照有3个坐标(x,y,然后z)。所以X 1 [11] [2]是第11个身体部位在时间1的z位置:

    import requests
    import numpy as np
    
    # download the data
    requests.get('https://s3.amazonaws.com/duhaime/blog/dancing-with-robots/dance.npy')
    
    # X.shape = time_intervals, n_body_parts, 3
    X = np.load('dance.npy')
    

    为了确保数据被正确提取,我将图像的前几帧可视化

    import mpl_toolkits.mplot3d.axes3d as p3
    import matplotlib.pyplot as plt
    from IPython.display import HTML
    from matplotlib import animation
    import matplotlib
    
    matplotlib.rcParams['animation.embed_limit'] = 2**128
    
    def update_points(time, points, X):
      arr = np.array([[ X[time][i][0], X[time][i][1] ] for i in range(int(X.shape[1]))])
      points.set_offsets(arr) # set x, y values
      points.set_3d_properties(X[time][:,2][:], zdir='z') # set z value
    
    def get_plot(X, lim=2, frames=200, duration=45):
      fig = plt.figure()
      ax = p3.Axes3D(fig)
      ax.set_xlim(-lim, lim)
      ax.set_ylim(-lim, lim)
      ax.set_zlim(-lim, lim)
      points = ax.scatter(X[0][:,0][:], X[0][:,1][:], X[0][:,2][:], depthshade=False) # x,y,z vals
      return animation.FuncAnimation(fig,
        update_points,
        frames,
        interval=duration,
        fargs=(points, X),
        blit=False  
      ).to_jshtml()
    
    HTML(get_plot(X, frames=int(X.shape[0])))
    

    enter image description here

    到现在为止,一直都还不错。接下来,我将x、y和z尺寸的特征置于中心:

    X -= np.amin(X, axis=(0, 1))
    X /= np.amax(X, axis=(0, 1))
    

    可视化结果 具有 HTML(get_plot(X, frames=int(X.shape[0]))) 显示数据中心的这些线。接下来,我使用Keras中的顺序API构建模型本身:

    from keras.models import Sequential, Model
    from keras.layers import Dense, LSTM, Dropout, Activation
    from keras.layers.advanced_activations import LeakyReLU
    from keras.losses import mean_squared_error
    from keras.optimizers import Adam
    import keras, os
    
    # config
    look_back = 32 # number of previous time frames to use to predict the positions at time i
    lstm_cells = 256 # number of cells in each LSTM "layer"
    n_features = int(X.shape[1]) * int(X.shape[2]) # number of coordinate values to be predicted by each of `m` models
    input_shape = (look_back, n_features) # shape of inputs
    m = 32 # number of gaussian models to build
    
    # set boolean controlling whether we use MDN or not
    use_mdn = True
    
    model = Sequential()
    model.add(LSTM(lstm_cells, return_sequences=True, input_shape=input_shape))
    model.add(LSTM(lstm_cells, return_sequences=True))
    model.add(LSTM(lstm_cells))
    
    if use_mdn:
      model.add(MDN(n_features, m))
      model.compile(loss=get_mixture_loss_func(n_features, m), optimizer=Adam(lr=0.000001))
    else:
      model.add(Dense(n_features, activation='tanh'))
      model.compile(loss=mean_squared_error, optimizer='sgd')
    
    model.summary()
    

    一旦建立了模型,我就在 look_back 时间片:

    # get training data in right shape
    train_x = []
    train_y = []
    
    n_time, n_obs, n_attrs = [int(i) for i in X.shape]
    
    for i in range(look_back, n_time-1, 1):
      train_x.append( X[i-look_back:i].reshape(look_back, n_obs * n_attrs) )
      train_y.append( X[i+1].ravel() )
    
    train_x = np.array(train_x)
    train_y = np.array(train_y)
    

    from livelossplot import PlotLossesKeras
    
    # fit the model
    model.fit(train_x, train_y, epochs=1024, batch_size=1, callbacks=[PlotLossesKeras()])
    

    # generate `n_frames` of new output time slices
    n_frames = 3000
    
    # seed the data to plot with the first `look_back` animation frames
    data = X[0:look_back]
    
    x0, x1, x2 = [int(i) for i in train_x.shape]
    d0, d1, d2 = [int(i) for i in data.shape]
    
    for i in range(look_back, n_frames, 1):
      # get the model's prediction for the next position of points at time `i`
      result = model.predict(train_x[i].reshape(1, x1, x2))
      # if using the mixed density network, pull out vals that describe vertex positions
      if use_mdn:
        result = np.apply_along_axis(sample_from_output, 1, result, n_features, m, temp=1.0)
      # reshape the result into the form of rows in `X`
      result = result.reshape(1, d1, d2)
      # push the result into the shape of `train_x` observations
      stacked = np.vstack((data[i-look_back+1:i], result)).reshape(1, x1, x2)
      # add the result to the `train_x` observations
      train_x = np.vstack((train_x, stacked))
      # add the result to the dataset for plotting
      data = np.vstack((data[:i], result))
    

    如果我设置 use_mdn False 而不是使用一个简单的平方误差和损失(L2损失),那么产生的可视化似乎有点令人毛骨悚然,但仍然有一个一般人的形状。

    如果我设置 使用mdn True 然而,使用自定义的MDN损失函数,结果却很奇怪。我认识到MDN层添加了大量需要训练的参数,并且可能需要多个数量级的训练数据来实现像L2损失函数输出那样的人形输出。

    也就是说,我想问问那些比我更广泛地使用神经网络模型的人是否认为上述方法有什么根本性的错误。对这个问题的任何见解都会非常有帮助。

    1 回复  |  直到 6 年前
        1
  •  2
  •   duhaime    5 年前

    gist ]! 以下是MDN类:

    from keras.layers.advanced_activations import LeakyReLU
    from keras.models import Sequential, Model
    from keras.layers import Dense, Input, merge, concatenate, Dense, LSTM, CuDNNLSTM
    from keras.engine.topology import Layer
    from keras import backend as K
    import tensorflow_probability as tfp
    import tensorflow as tf
    
    # check tfp version, as tfp causes cryptic error if out of date
    assert float(tfp.__version__.split('.')[1]) >= 5
    
    class MDN(Layer):
      '''Mixture Density Network with unigaussian kernel'''
      def __init__(self, n_mixes, output_dim, **kwargs):
        self.n_mixes = n_mixes
        self.output_dim = output_dim
    
        with tf.name_scope('MDN'):
          self.mdn_mus    = Dense(self.n_mixes * self.output_dim, name='mdn_mus')
          self.mdn_sigmas = Dense(self.n_mixes, activation=K.exp, name='mdn_sigmas')
          self.mdn_alphas = Dense(self.n_mixes, activation=K.softmax, name='mdn_alphas')
        super(MDN, self).__init__(**kwargs)
    
      def build(self, input_shape):
        self.mdn_mus.build(input_shape)
        self.mdn_sigmas.build(input_shape)
        self.mdn_alphas.build(input_shape)
        self.trainable_weights = self.mdn_mus.trainable_weights + \
          self.mdn_sigmas.trainable_weights + \
          self.mdn_alphas.trainable_weights
        self.non_trainable_weights = self.mdn_mus.non_trainable_weights + \
          self.mdn_sigmas.non_trainable_weights + \
          self.mdn_alphas.non_trainable_weights
        self.built = True
    
      def call(self, x, mask=None):
        with tf.name_scope('MDN'):
          mdn_out = concatenate([
            self.mdn_mus(x),
            self.mdn_sigmas(x),
            self.mdn_alphas(x)
          ], name='mdn_outputs')
        return mdn_out
    
      def get_output_shape_for(self, input_shape):
        return (input_shape[0], self.output_dim)
    
      def get_config(self):
        config = {
          'output_dim': self.output_dim,
          'n_mixes': self.n_mixes,
        }
        base_config = super(MDN, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))
    
      def get_loss_func(self):
        def unigaussian_loss(y_true, y_pred):
          mix = tf.range(start = 0, limit = self.n_mixes)
          out_mu, out_sigma, out_alphas = tf.split(y_pred, num_or_size_splits=[
            self.n_mixes * self.output_dim,
            self.n_mixes,
            self.n_mixes,
          ], axis=-1, name='mdn_coef_split')
    
          def loss_i(i):
            batch_size = tf.shape(out_sigma)[0]
            sigma_i = tf.slice(out_sigma, [0, i], [batch_size, 1], name='mdn_sigma_slice')
            alpha_i = tf.slice(out_alphas, [0, i], [batch_size, 1], name='mdn_alpha_slice')
            mu_i = tf.slice(out_mu, [0, i * self.output_dim], [batch_size, self.output_dim], name='mdn_mu_slice')
            dist = tfp.distributions.Normal(loc=mu_i, scale=sigma_i)
            loss = dist.prob(y_true) # find the pdf around each value in y_true
            loss = alpha_i * loss
            return loss
    
          result = tf.map_fn(lambda  m: loss_i(m), mix, dtype=tf.float32, name='mix_map_fn')
          result = tf.reduce_sum(result, axis=0, keepdims=False)
          result = -tf.log(result)
          result = tf.reduce_mean(result)
          return result
    
        with tf.name_scope('MDNLayer'):
          return unigaussian_loss
    

    以及LSTM类:

    class LSTM_MDN:
      def __init__(self, n_verts=15, n_dims=3, n_mixes=2, look_back=1, cells=[32,32,32,32], use_mdn=True):
        self.n_verts = n_verts
        self.n_dims = n_dims
        self.n_mixes = n_mixes
        self.look_back = look_back
        self.cells = cells
        self.use_mdn = use_mdn
        self.LSTM = CuDNNLSTM if len(gpus) > 0 else LSTM
        self.model = self.build_model()
        if use_mdn:
          self.model.compile(loss=MDN(n_mixes, n_verts*n_dims).get_loss_func(), optimizer='adam', metrics=['accuracy'])
        else:
          self.model.compile(loss='mse', optimizer='adam', metrics=['accuracy'])
    
      def build_model(self):
        i = Input((self.look_back, self.n_verts*self.n_dims))
        h = self.LSTM(self.cells[0], return_sequences=True)(i) # return sequences, stateful
        h = self.LSTM(self.cells[1], return_sequences=True)(h)
        h = self.LSTM(self.cells[2])(h)
        h = Dense(self.cells[3])(h)
        if self.use_mdn:
          o = MDN(self.n_mixes, self.n_verts*self.n_dims)(h)
        else:
          o = Dense(self.n_verts*self.n_dims)(h)
        return Model(inputs=[i], outputs=[o])
    
      def prepare_inputs(self, X, look_back=2):
        '''
        Prepare inputs in shape expected by LSTM
        @returns:
          numpy.ndarray train_X: has shape: n_samples, lookback, verts * dims
          numpy.ndarray train_Y: has shape: n_samples, verts * dims
        '''
        # prepare data for the LSTM_MDN
        X = X.swapaxes(0, 1) # reshape to time, vert, dim
        n_time, n_verts, n_dims = X.shape
    
        # validate shape attributes
        if n_verts != self.n_verts: raise Exception(' ! got', n_verts, 'vertices, expected', self.n_verts)
        if n_dims != self.n_dims: raise Exception(' ! got', n_dims, 'dims, expected', self.n_dims)
        if look_back != self.look_back: raise Exception(' ! got', look_back, 'for look_back, expected', self.look_back)
    
        # lstm expects data in shape [samples_in_batch, timestamps, values]
        train_X = []
        train_Y = []
        for i in range(look_back, n_time, 1):
          train_X.append( X[i-look_back:i,:,:].reshape(look_back, n_verts * n_dims) ) # look_back, verts * dims
          train_Y.append( X[i,:,:].reshape(n_verts * n_dims) ) # verts * dims
        train_X = np.array(train_X) # n_samples, lookback, verts * dims
        train_Y = np.array(train_Y) # n_samples, verts * dims
        return [train_X, train_Y]
    
      def predict_positions(self, input_X):
        '''
        Predict the output for a series of input frames. Each prediction has shape (1, y), where y contains:
          mus = y[:n_mixes*n_verts*n_dims]
          sigs = y[n_mixes*n_verts*n_dims:-n_mixes]
          alphas = softmax(y[-n_mixes:])
        @param numpy.ndarray input_X: has shape: n_samples, look_back, n_verts * n_dims
        @returns:
          numpy.ndarray X: has shape: verts, time, dims
        '''
        predictions = []
        for i in range(input_X.shape[0]):
          y = self.model.predict( train_X[i:i+1] ).squeeze()
          mus = y[:n_mixes*n_verts*n_dims]
          sigs = y[n_mixes*n_verts*n_dims:-n_mixes]
          alphas = self.softmax(y[-n_mixes:])
    
          # find the most likely distribution then pull out the mus that correspond to that selected index
          alpha_idx = np.argmax(alphas) # 0
          alpha_idx = 0
          predictions.append( mus[alpha_idx*self.n_verts*self.n_dims:(alpha_idx+1)*self.n_verts*self.n_dims] )
        predictions = np.array(predictions).reshape(train_X.shape[0], self.n_verts, self.n_dims).swapaxes(0, 1)
        return predictions # shape = n_verts, n_time, n_dims
    
      def softmax(self, x):
        ''''Compute softmax values for vector `x`'''
        r = np.exp(x - np.max(x))
        return r / r.sum()
    

    X = data.selected.X
    n_verts, n_time, n_dims = X.shape
    n_mixes = 3
    look_back = 2
    
    lstm_mdn = LSTM_MDN(n_verts=n_verts, n_dims=n_dims, n_mixes=n_mixes, look_back=look_back)
    train_X, train_Y = lstm_mdn.prepare_inputs(X, look_back=look_back)
    

    上面链接的要点有完整的血淋淋的细节,以防有人想复制它,并把它拆开,以更好地理解力学。。。