jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
compressive_storage.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "compressive_storage.hpp"
18 
19 #include <string>
20 #include <vector>
21 #include "compressor.hpp"
22 #include "gmm_compressor.hpp"
23 #include "kmeans_compressor.hpp"
24 
25 namespace jubatus {
26 namespace core {
27 namespace clustering {
28 
30  const std::string& name,
31  const clustering_config& config)
32  : storage(name, config),
33  status_(0) {
34  mine_.push_back(wplist());
35 }
36 
38  jubatus::util::lang::shared_ptr<compressor::compressor> compressor) {
39  compressor_ = compressor;
40 }
41 
43  wplist& c0 = mine_[0];
44  c0.push_back(point);
45  if (c0.size() >= static_cast<size_t>(config_.bucket_size)) {
46  wplist cr;
47  compressor_->compress(
49  c0.swap(cr);
50  status_ += 1;
51  carry_up(0);
52 
54  }
55 }
56 
58  wplist ret;
59  for (std::vector<wplist>::const_iterator it = mine_.begin();
60  it != mine_.end(); ++it) {
61  concat(*it, ret);
62  }
63  return ret;
64 }
65 
67  double factor = std::exp(-config_.forgetting_factor);
68  typedef wplist::iterator iter;
69  for (iter it = points.begin(); it != points.end(); ++it) {
70  it->weight *= factor;
71  }
72 }
73 
75  double C = config_.forgetting_threshold;
76  double lam = config_.forgetting_factor;
77  if (std::exp(-lam * bucket_number) < C) {
78  return true;
79  }
80  return false;
81 }
82 
83 bool compressive_storage::is_next_bucket_full(size_t bucket_number) {
84  return digit(status_ - 1, bucket_number, config_.bucket_length) ==
86 }
87 
89  if (r >= mine_.size() - 1) {
90  mine_.push_back(wplist());
91  }
92  forget_weight(mine_[r]);
93  if (!is_next_bucket_full(r)) {
94  if (!reach_forgetting_threshold(r + 1) ||
95  mine_[r].size() == get_mine().size()) {
96  concat(mine_[r], mine_[r + 1]);
97  mine_[r].clear();
98  } else {
99  mine_[r + 1].swap(mine_[r]);
100  mine_[r].clear();
101  }
102  } else {
103  wplist cr = mine_[r];
104  wplist crr = mine_[r + 1];
105  mine_[r].clear();
106  mine_[r + 1].clear();
107  concat(cr, crr);
108  size_t dstsize = (r == 0) ? config_.compressed_bucket_size :
109  2 * r * r * config_.compressed_bucket_size;
111  dstsize, mine_[r + 1]);
112  carry_up(r + 1);
113  }
114 }
115 
117  packer.pack_array(4);
118  storage::pack_impl_(packer);
119  packer.pack(mine_);
120  packer.pack(status_);
121  packer.pack(*compressor_);
122 }
123 
124 void compressive_storage::unpack_impl_(msgpack::object o) {
125  std::vector<msgpack::object> mems;
126  o.convert(&mems);
127  if (mems.size() != 4) {
128  throw msgpack::type_error();
129  }
130  storage::unpack_impl_(mems[0]);
131  mems[1].convert(&mine_);
132  mems[2].convert(&status_);
133  mems[3].convert(compressor_.get());
134 }
135 
138  mine_.clear();
139  mine_.push_back(wplist());
140  status_ = 0;
141 }
142 
143 } // namespace clustering
144 } // namespace core
145 } // namespace jubatus
char digit(int num, int r, int n)
Definition: util.cpp:37
void pack_impl_(framework::packer &packer) const
jubatus::util::lang::shared_ptr< compressor::compressor > compressor_
void concat(const wplist &src, wplist &dst)
Definition: util.cpp:33
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
virtual void unpack_impl_(msgpack::object o)
Definition: storage.cpp:115
void set_compressor(jubatus::util::lang::shared_ptr< compressor::compressor > compressor)
virtual void pack_impl_(framework::packer &packer) const
Definition: storage.cpp:112
compressive_storage(const std::string &name, const clustering_config &config)
std::vector< weighted_point > wplist
Definition: types.hpp:55