Commit 1a931e90 authored by Yuxin Wu's avatar Yuxin Wu

move infogan-specific things (prior,noise) to examples.

parent 9d6197aa
......@@ -59,11 +59,23 @@ class Model(GANModelDesc):
# latent space is cat(10) x uni(1) x uni(1) x noise(NOISE_DIM)
self.factors = ProductDistribution("factors", [CategoricalDistribution("cat", 10),
GaussianDistributionUniformPrior("uni_a", 1),
GaussianDistributionUniformPrior("uni_b", 1),
NoiseDistribution("noise", NOISE_DIM)])
z = self.factors.sample_prior(BATCH, name='zc')
GaussianDistribution("uni_a", 1),
GaussianDistribution("uni_b", 1),
])
# sample the latent code zc:
idxs = tf.squeeze(tf.multinomial(tf.zeros([BATCH, 10]), 1), 1)
sample = tf.one_hot(idxs, 10)
z_cat = symbf.remove_shape(sample, 0, name='z_cat')
z_uni_a = symbf.remove_shape(
tf.random_uniform([BATCH, 1], -1, 1), 0, name='z_uni_a')
z_uni_b = symbf.remove_shape(
tf.random_uniform([BATCH, 1], -1, 1), 0, name='z_uni_b')
z_noise = symbf.remove_shape(
tf.random_uniform([BATCH, NOISE_DIM], -1, 1), 0, name='z_noise')
zc = tf.concat_v2([z_cat, z_uni_a, z_uni_b], 1, name='z_code')
z = tf.concat_v2([zc, z_noise], 1, name='z')
with argscope([Conv2D, Deconv2D, FullyConnected],
W_init=tf.truncated_normal_initializer(stddev=0.02)):
......@@ -79,20 +91,36 @@ class Model(GANModelDesc):
with tf.variable_scope('discrim', reuse=True):
fake_pred, dist_param = self.discriminator(fake_sample)
# post-process all dist_params from discriminator
# post-process output vector from discriminator to become valid
# distribution parameters
encoder_activation = self.factors.encoder_activation(dist_param)
"""
Mutual information between x (i.e. zc in this case) and some
information s (the generated samples in this case):
I(x;s) = H(x) - H(x|s)
= H(x) + E[\log P(x|s)]
The distribution from which zc is sampled, in this case, is set to a fixed prior already.
For the second term, we can maximize its variational lower bound:
E_{x \sim P(x|s)}[\log Q(x|s)]
where Q(x|s) is a proposal distribution to approximate P(x|s).
Here, Q(x|s) is assumed to be a distribution which shares the form
of self.factors, and whose parameters are predicted by the discriminator network.
"""
with tf.name_scope("mutual_information"):
MIs = self.factors.mutual_information(z, encoder_activation)
mi = tf.add_n(MIs, name="total")
summary.add_moving_summary(MIs + [mi])
ents = self.factors.entropy(zc, encoder_activation)
cond_entropy = tf.add_n(ents, name="total_conditional_entropy")
summary.add_moving_summary(cond_entropy, *ents)
# default GAN objective
self.build_losses(real_pred, fake_pred)
# subtract mutual information for latent factores (we want to maximize them)
self.g_loss = tf.subtract(self.g_loss, mi, name='total_g_loss')
self.d_loss = tf.subtract(self.d_loss, mi, name='total_d_loss')
self.g_loss = tf.add(self.g_loss, cond_entropy, name='total_g_loss')
self.d_loss = tf.add(self.d_loss, cond_entropy, name='total_d_loss')
summary.add_moving_summary(self.g_loss, self.d_loss)
......
import tensorflow as tf
from functools import wraps
import numpy as np
from ..utils import logger
from ..tfutils import get_name_scope_name
__all__ = ['Distribution',
'CategoricalDistribution', 'GaussianDistributionUniformPrior',
'NoiseDistribution', 'ProductDistribution']
# TODO encoder_activation and the ProductDistribution class brings many redundant concat and split
'CategoricalDistribution', 'GaussianDistribution',
'ProductDistribution']
def class_scope(func):
......@@ -73,70 +70,6 @@ class Distribution(object):
assert ret.get_shape().ndims == 1, ret.get_shape()
return ret
@class_scope
def loglikelihood_prior(self, x):
"""likelihood from prior for this distribution
Args:
x: samples of shape (batch, sample_dim)
Returns:
a symbolic vector containing loglikelihood of each sample,
using prior of this distribution.
"""
assert x.get_shape().ndims == 2 and \
x.get_shape()[1] == self.sample_dim, \
x.get_shape()
batch_size = x.get_shape().as_list()[0]
s = self.prior(batch_size)
return self._loglikelihood(x, s)
@class_scope
def mutual_information(self, x, theta):
"""
Approximates mutual information between x and some information s.
Here we return a variational lower bound of the mutual information,
assuming a proposal distribution Q(x|s) (which approximates P(x|s) )
has the form of this distribution parameterized by theta.
.. math::
I(x;s) = H(x) - H(x|s)
= H(x) + E[\log P(x|s)]
\\ge H(x) + E_{x \sim P(x|s)}[\log Q(x|s)]
Args:
x: samples of shape (batch, sample_dim)
theta: parameters defining the proposal distribution Q. shape (batch, param_dim).
Returns:
lower-bounded mutual information, a scalar tensor.
"""
entr = self.prior_entropy(x)
cross_entr = self.entropy(x, theta)
return tf.subtract(entr, cross_entr, name="mutual_information")
@class_scope
def prior_entropy(self, x):
r"""
Estimated entropy of the prior distribution,
from a batch of samples (as average). It
estimates the likelihood of samples using the prior distribution.
.. math::
H(x) = -E[\log p(x_i)], \text{where } p \text{ is the prior}
Args:
x: samples of shape (batch, sample_dim)
Returns:
a scalar, estimated entropy.
"""
return tf.reduce_mean(-self.loglikelihood_prior(x), name="prior_entropy")
@class_scope
def entropy(self, x, theta):
r""" Entropy of this distribution parameterized by theta,
......@@ -155,16 +88,6 @@ class Distribution(object):
"""
return tf.reduce_mean(-self.loglikelihood(x, theta), name="entropy")
@class_scope
def prior(self, batch_size):
"""Get the prior parameters of this distribution.
Returns:
a (batch, param_dim) 2D tensor, containing priors of
this distribution repeated for batch_size times.
"""
return self._prior(batch_size)
@class_scope
def encoder_activation(self, dist_param):
""" An activation function to produce
......@@ -178,19 +101,6 @@ class Distribution(object):
"""
return self._encoder_activation(dist_param)
def sample_prior(self, batch_size):
"""
Sample a batch of data with the prior distribution.
Args:
batch_size(int):
Returns:
samples of shape (batch, sample_dim)
"""
s = self._sample_prior(batch_size)
return s
@property
def param_dim(self):
"""
......@@ -210,12 +120,6 @@ class Distribution(object):
def _loglikelihood(self, x, theta):
raise NotImplementedError
def _prior(self, batch_size):
raise NotImplementedError
def _sample_prior(self, batch_size):
raise NotImplementedError
def _encoder_activation(self, dist_param):
return dist_param
......@@ -236,15 +140,6 @@ class CategoricalDistribution(Distribution):
eps = 1e-8
return tf.reduce_sum(tf.log(theta + eps) * x, reduction_indices=1)
def _prior(self, batch_size):
return tf.constant(1.0 / self.cardinality,
tf.float32, [batch_size, self.cardinality])
def _sample_prior(self, batch_size):
ids = tf.multinomial(tf.zeros([batch_size, self.cardinality]), num_samples=1)[:, 0]
ret = tf.one_hot(ids, self.cardinality)
return ret
def _encoder_activation(self, dist_param):
return tf.nn.softmax(dist_param)
......@@ -257,17 +152,14 @@ class CategoricalDistribution(Distribution):
return self.cardinality
class GaussianDistributionUniformPrior(Distribution):
"""Gaussian distribution with prior U(-1,1).
It implements a Gaussian with uniform :meth:`sample_prior` method.
"""
class GaussianDistribution(Distribution):
def __init__(self, name, dim, fixed_std=True):
"""
Args:
dim(int): the dimension of samples.
fixed_std (bool): if True, will use 1 as std for all dimensions.
"""
super(GaussianDistributionUniformPrior, self).__init__(name)
super(GaussianDistribution, self).__init__(name)
self.dim = dim
self.fixed_std = fixed_std
......@@ -287,16 +179,6 @@ class GaussianDistributionUniformPrior(Distribution):
reduction_indices=1
)
def _prior(self, batch_size):
if self.fixed_std:
return tf.zeros([batch_size, self.param_dim])
else:
return tf.concat_v2([tf.zeros([batch_size, self.param_dim]),
tf.ones([batch_size, self.param_dim])], 1)
def _sample_prior(self, batch_size):
return tf.random_uniform([batch_size, self.dim], -1, 1)
def _encoder_activation(self, dist_param):
if self.fixed_std:
return dist_param
......@@ -318,42 +200,6 @@ class GaussianDistributionUniformPrior(Distribution):
return self.dim
class NoiseDistribution(Distribution):
"""This is not really a distribution.
It is the uniform noise input of GAN which shares interface with Distribution, to
simplify implementation of GAN.
"""
def __init__(self, name, dim):
"""
Args:
dim(int): the dimension of the noise.
"""
# TODO more options, e.g. use gaussian or uniform?
super(NoiseDistribution, self).__init__(name)
self.dim = dim
def _loglikelihood(self, x, theta):
return 0
def _prior(self):
return 0
def _sample_prior(self, batch_size):
zc = tf.random_uniform([batch_size, self.dim], -1, 1)
return zc
def _encoder_activation(self, dist_param):
return 0
@property
def param_dim(self):
return 0
@property
def sample_dim(self):
return self.dim
class ProductDistribution(Distribution):
"""A product of a list of independent distributions. """
def __init__(self, name, dists):
......@@ -389,41 +235,21 @@ class ProductDistribution(Distribution):
yield s[:, offset:offset + off]
offset += off
def mutual_information(self, x, theta):
def entropy(self, x, theta):
"""
Return mutual information of all distributions but skip the
unparameterized ones.
Note:
It returns a list, as one might use different weights for each
distribution.
Returns:
list[tf.Tensor]: mutual informations of each distribution.
list[tf.Tensor]: entropy of each distribution.
"""
MIs = [] # noqa
ret = []
for dist, xi, ti in zip(self.dists,
self._splitter(x, False),
self._splitter(theta, True)):
if dist.param_dim > 0:
MIs.append(dist.mutual_information(xi, ti))
return MIs
def sample_prior(self, batch_size, name='sample_prior'):
"""
Concat the samples from all distributions.
Returns:
tf.Tensor: a tensor of shape (batch, sample_dim), but first dimension is statically unknown,
allowing you to do inference with custom batch size.
"""
samples = []
for k, dist in enumerate(self.dists):
init = dist._sample_prior(batch_size)
plh = tf.placeholder_with_default(init, [batch_size, dist.sample_dim], name='z_' + dist.name)
samples.append(plh)
logger.info("Placeholder for %s(%s) is %s " % (dist.name, dist.__class__.__name__, plh.name[:-2]))
return tf.concat_v2(samples, 1, name=name)
ret.append(dist.entropy(xi, ti))
return ret
def _encoder_activation(self, dist_params):
rsl = []
......
......@@ -361,3 +361,28 @@ def soft_triplet_loss(anchor, positive, negative, extra=True):
return loss, pos_dist, neg_dist
else:
return loss
def remove_shape(x, axis, name):
"""
Make the static shape of a tensor less specific, by
using :func:`tf.placeholder_with_default`.
See `tensorflow#5680
<https://github.com/tensorflow/tensorflow/issues/5680>`_.
Args:
x: a tensor
axis(int or list of ints): the axes to reset shape to None.
name(str): name of the output tensor
Returns:
a tensor equal to x, but shape information is partially cleared
"""
shp = x.get_shape().as_list()
if not isinstance(axis, list):
axis = [axis]
for a in axis:
if shp[a] is None:
raise ValueError("Axis {} of shape {} is already unknown!".format(a, shp))
shp[a] = None
x = tf.placeholder_with_default(x, shape=shp, name=name)
return x
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment