jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
kmeans_compressor.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "kmeans_compressor.hpp"
18 
19 #include <algorithm>
20 #include <cfloat>
21 #include <cmath>
22 #include <limits>
23 #include <utility>
24 #include <vector>
25 #include <stack>
26 
27 #include "../common/assert.hpp"
28 
29 using std::min;
30 using std::max;
31 using std::stack;
32 using std::pair;
33 using std::vector;
34 
35 namespace jubatus {
36 namespace core {
37 namespace clustering {
38 namespace compressor {
39 
40 namespace {
41 
42 struct compare_by_first {
43  template <typename T, typename S>
44  bool operator() (
45  const std::pair<T, S>& p1,
46  const std::pair<T, S>& p2) const {
47  return p1.first < p2.first;
48  }
49 };
50 
51 template <typename T>
52 void partial_sort_by(
53  const std::vector<double>& scores,
54  std::vector<T>& array,
55  size_t size) {
56  JUBATUS_ASSERT_EQ(scores.size(),
57  array.size(),
58  "lengths of scores and data must be same.");
59 
60  std::vector<std::pair<double, T> > pairs(scores.size());
61  for (size_t i = 0; i < scores.size(); ++i) {
62  pairs[i].first = scores[i];
63  swap(pairs[i].second, array[i]);
64  }
65  size = std::min(size, pairs.size());
66  std::partial_sort(pairs.begin(),
67  pairs.begin() + size,
68  pairs.end(),
69  compare_by_first());
70  array.resize(size);
71  for (size_t i = 0; i < size; ++i) {
72  swap(pairs[i].second, array[i]);
73  }
74 }
75 
76 void bicriteria_as_coreset(
77  const wplist& src,
78  wplist bic,
79  const size_t dstsize,
80  wplist& dst) {
81  JUBATUS_ASSERT_GE(dstsize, dst.size(), "");
82 
83  typedef wplist::const_iterator citer;
84  typedef wplist::iterator iter;
85  bic.resize(dstsize - dst.size());
86  for (iter it = bic.begin(); it != bic.end(); ++it) {
87  it->weight = 0;
88  }
89  for (citer it = dst.begin(); it != dst.end(); ++it) {
90  pair<int, double> m = min_dist(*it, bic);
91  bic[m.first].weight -= it->weight;
92  }
93  for (citer it = src.begin(); it != src.end(); ++it) {
94  pair<int, double> m = min_dist(*it, bic);
95  bic[m.first].weight += it->weight;
96  }
97  dst.insert(dst.end(), bic.begin(), bic.end());
98  for (iter it = dst.begin(); it != dst.end(); ++it) {
99  if (it->weight < 0) {
100  it->weight = 0;
101  }
102  }
103 }
104 
105 } // namespace
106 
108  : compressor(cfg) {
109 }
110 
112 }
113 
115  const wplist& src,
116  size_t bsize,
117  size_t dstsize,
118  wplist& dst) {
119  if (dstsize >= src.size()) {
120  concat(src, dst);
121  return;
122  }
123  wplist bicriteria;
124  get_bicriteria(src, bsize, dstsize, bicriteria);
125  if (bicriteria.size() < dstsize) {
126  wplist srcclone = src;
128  srcclone,
129  bicriteria,
130  dstsize - bicriteria.size(),
131  dst);
132  }
133  bicriteria_as_coreset(src, bicriteria, dstsize, dst);
134 }
135 
137  const wplist& src,
138  size_t bsize,
139  size_t dstsize,
140  wplist& dst) {
141  dst.clear();
142  wplist resid = src;
143  vector<double> weights(src.size());
144  double r =
145  (1 - std::exp(bsize * (std::log(bsize) - std::log(src.size())) / dstsize))
146  / 2;
147  r = max(0.1, r);
148  std::vector<size_t> ind(bsize);
149  while (resid.size() > 1 && dst.size() < dstsize) {
150  weights.resize(resid.size());
151  for (wplist::const_iterator it = resid.begin(); it != resid.end(); ++it) {
152  weights[it - resid.begin()] = it->weight;
153  }
154 
155  // Sample `bsize` points and insert them to the result
156  discrete_distribution d(weights.begin(), weights.end());
157  std::generate(ind.begin(), ind.end(), d);
158 
159  std::sort(ind.begin(), ind.end());
160  ind.erase(std::unique(ind.begin(), ind.end()), ind.end());
161 
162  for (std::vector<size_t>::iterator it = ind.begin();
163  it != ind.end(); ++it) {
164  dst.push_back(resid[*it]);
165  }
166 
167  // Remove `r` nearest points from `resid`
168  std::vector<double> distances;
169  for (wplist::iterator itr = resid.begin(); itr != resid.end(); ++itr) {
170  distances.push_back(-min_dist(*itr, dst).second);
171  }
172  // TODO(unno): Is `r` lesser than 1.0?
173  size_t size = std::min(resid.size(),
174  static_cast<size_t>(resid.size() * r));
175  if (size == 0) {
176  size = 1;
177  }
178 
179  partial_sort_by(distances, resid, size);
180  }
181 }
182 
184  const weighted_point& p,
185  double min_dist,
186  const weighted_point& bp,
187  double bp_score,
188  double weight_sum,
189  double squared_min_dist_sum) {
190  return std::ceil(weight_sum * (
191  min_dist * min_dist * p.weight / squared_min_dist_sum)) + 1;
192 }
193 
195  const wplist& src,
196  const wplist& bicriteria,
197  size_t dstsize,
198  wplist& dst) {
199  if (bicriteria.size() == 0) {
200  dst = src;
201  return;
202  }
203  double weight_sum = 0;
204  double squared_min_dist_sum = 0;
205  std::vector<size_t> nearest_indexes(src.size());
206  std::vector<double> nearest_distances(src.size());
207  std::vector<double> bicriteria_scores(bicriteria.size());
208  for (size_t i = 0; i < src.size(); ++i) {
209  double weight = src[i].weight;
210  std::pair<int, double> m = min_dist(src[i], bicriteria);
211  nearest_indexes[i] = m.first;
212  nearest_distances[i] = m.second;
213 
214  bicriteria_scores[m.first] += weight;
215  squared_min_dist_sum += m.second * m.second * weight;
216  weight_sum += weight;
217  }
218  squared_min_dist_sum = std::max(squared_min_dist_sum,
219  std::numeric_limits<double>::min());
220  std::vector<double> weights;
221  double sumw = 0;
222  for (size_t i = 0; i < src.size(); ++i) {
223  double prob = get_probability(
224  src[i],
225  nearest_distances[i],
226  bicriteria[nearest_indexes[i]],
227  bicriteria_scores[nearest_indexes[i]],
228  weight_sum,
229  squared_min_dist_sum);
230  weights.push_back(prob);
231  sumw += prob;
232  }
233 
234  discrete_distribution d(weights.begin(), weights.end());
235 
236  std::vector<size_t> ind(dstsize);
237  std::generate(ind.begin(), ind.end(), d);
238 
239  for (std::vector<size_t>::iterator it = ind.begin(); it != ind.end(); ++it) {
240  size_t index = *it;
241  weighted_point sample = src[index];
242  double prob = weights[index] / sumw;
243  sample.weight *= 1.0 / dstsize / prob;
244  dst.push_back(sample);
245  }
246 }
247 
248 } // namespace compressor
249 } // namespace clustering
250 } // namespace core
251 } // namespace jubatus
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
void compress(const wplist &src, size_t bsize, size_t dstsize, wplist &dst)
void get_bicriteria(const wplist &src, size_t bsize, size_t dstsize, wplist &dst)
void concat(const wplist &src, wplist &dst)
Definition: util.cpp:33
pair< size_t, double > min_dist(const common::sfv_t &p, const vector< common::sfv_t > &P)
Definition: util.cpp:182
#define JUBATUS_ASSERT_GE(a, b, messages)
Definition: assert.hpp:71
virtual double get_probability(const weighted_point &p, double min_dist, const weighted_point &nearest_bp, double bp_score, double weight_sum, double squared_min_dist_sum)
void swap(weighted_point &p1, weighted_point &p2)
Definition: types.hpp:47
void bicriteria_to_coreset(const wplist &src, const wplist &bicriteria, size_t dstsize, wplist &dst)
std::vector< weighted_point > wplist
Definition: types.hpp:55