jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
clustering.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "clustering.hpp"
18 
19 #include <algorithm>
20 #include <cmath>
21 #include <cassert>
22 #include <string>
23 #include <vector>
24 #include "jubatus/util/lang/function.h"
25 #include "jubatus/util/lang/bind.h"
26 
27 #include "../common/jsonconfig.hpp"
29 #include "storage_factory.hpp"
30 
31 using jubatus::util::lang::shared_ptr;
32 
33 namespace jubatus {
34 namespace core {
35 namespace clustering {
36 
38  const std::string& name,
39  const std::string& method,
40  const clustering_config& cfg)
41  : config_(cfg),
42  name_(name),
43  method_(method),
44  storage_() {
45 
46  // TODO(@rimms): move to factory
47  if (method =="gmm" &&
48  cfg.compressor_method == "compressive_kmeans") {
50  "method = gmm, compressor_method != compressive_kmeans"));
51  }
52 
53  // TODO(@rimms): move to factory
54  if (method =="kmeans" &&
55  cfg.compressor_method == "compressive_gmm") {
57  "method = kmeans, compressor_method != compressive_gmm"));
58  }
59 
60  if (!(1 <= cfg.k)) {
61  throw JUBATUS_EXCEPTION(
62  common::invalid_parameter("1 <= k"));
63  }
64 
65  if (!(2 <= cfg.bucket_size)) {
66  throw JUBATUS_EXCEPTION(
67  common::invalid_parameter("2 <= bucket_size"));
68  }
69 
70  if (!(2 <= cfg.bucket_length)) {
71  throw JUBATUS_EXCEPTION(
72  common::invalid_parameter("2 <= bucket_length"));
73  }
74 
75  if (!(1 <= cfg.bicriteria_base_size &&
78  "1 <= bicriteria_base_size < compressed_bucket_size"));
79  }
80 
81  if (!(cfg.compressed_bucket_size < cfg.bucket_size)) {
82  throw JUBATUS_EXCEPTION(
83  common::invalid_parameter("compressed_bucket_size < bucket_size"));
84  }
85 
86  if (!(0.0 <= cfg.forgetting_factor)) {
87  throw JUBATUS_EXCEPTION(
88  common::invalid_parameter("0.0 <= forgetting_factor"));
89  }
90 
91  if (!(0.0 <= cfg.forgetting_threshold &&
92  cfg.forgetting_threshold <= 1.0)) {
94  "0.0 <= forgetting_threshold <= 1.0"));
95  }
96 
97  init();
98 }
99 
100 clustering::~clustering() {
101 }
102 
103 void clustering::init() {
107 }
108 
109 void clustering::set_storage(shared_ptr<storage> storage) {
110  storage->add_event_listener(REVISION_CHANGE,
111  jubatus::util::lang::bind(&clustering::update_clusters,
112  this, jubatus::util::lang::_1, true));
113  storage->add_event_listener(UPDATE,
114  jubatus::util::lang::bind(&clustering::update_clusters,
115  this, jubatus::util::lang::_1, false));
116  storage_.reset(new mixable_storage(storage));
117 }
118 
119 jubatus::util::lang::shared_ptr<storage> clustering::get_storage() {
120  return storage_->get_model();
121 }
122 
123 void clustering::update_clusters(const wplist& points, bool batch) {
124  if (batch) {
125  clustering_method_->batch_update(points);
126  } else {
127  clustering_method_->online_update(points);
128  }
129 }
130 
131 void clustering::set_clustering_method(
132  shared_ptr<clustering_method> clustering_method) {
133  clustering_method_ = clustering_method;
134 }
135 
136 bool clustering::push(const std::vector<weighted_point>& points) {
137  jubatus::util::lang::shared_ptr<storage> sto = storage_->get_model();
138  for (std::vector<weighted_point>::const_iterator it = points.begin();
139  it != points.end(); ++it) {
140  sto->add(*it);
141  }
142  return true;
143 }
144 
145 wplist clustering::get_coreset() const {
146  return storage_->get_model()->get_all();
147 }
148 
149 std::vector<common::sfv_t> clustering::get_k_center() const {
150  return clustering_method_->get_k_center();
151 }
152 
153 common::sfv_t clustering::get_nearest_center(const common::sfv_t& point) const {
154  return clustering_method_->get_nearest_center(point);
155 }
156 
157 wplist clustering::get_nearest_members(const common::sfv_t& point) const {
158  int64_t clustering_id = clustering_method_->get_nearest_center_index(point);
159  if (clustering_id == -1) {
160  return wplist();
161  }
162  return clustering_method_->get_cluster(clustering_id, get_coreset());
163 }
164 
165 std::vector<wplist> clustering::get_core_members() const {
166  return clustering_method_->get_clusters(get_coreset());
167 }
168 
169 size_t clustering::get_revision() const {
170  return storage_->get_model()->get_revision();
171 }
172 
173 framework::mixable* clustering::get_mixable() const {
174  return storage_.get();
175 }
176 
177 std::string clustering::type() const {
178  return "clustering";
179 }
180 
181 void clustering::pack(framework::packer& pk) const {
182  storage_->get_model()->pack(pk);
183 }
184 
185 void clustering::unpack(msgpack::object o) {
186  storage_->get_model()->unpack(o);
187 }
188 
189 void clustering::clear() {
190  storage_->get_model()->clear();
191 }
192 
193 void clustering::do_clustering() {
194  clustering_method_->batch_update(storage_->get_model()->get_all());
195 }
196 
197 } // namespace clustering
198 } // namespace core
199 } // namespace jubatus
framework::linear_mixable_helper< storage, diff_t > mixable_storage
Definition: storage.hpp:79
void set_storage(jubatus::util::lang::shared_ptr< storage > storage)
Definition: clustering.cpp:109
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
std::string method
static jubatus::util::lang::shared_ptr< storage > create(const std::string &name, const clustering_config &config)
jubatus::util::lang::shared_ptr< clustering_method > clustering_method_
Definition: clustering.hpp:89
void update_clusters(const wplist &points, bool batch)
Definition: clustering.cpp:123
void set_clustering_method(jubatus::util::lang::shared_ptr< clustering_method > clustering_method)
Definition: clustering.cpp:131
std::vector< std::pair< std::string, float > > sfv_t
Definition: type.hpp:29
jubatus::util::lang::shared_ptr< mixable_storage > storage_
Definition: clustering.hpp:90
std::vector< weighted_point > wplist
Definition: types.hpp:55
static jubatus::util::lang::shared_ptr< clustering_method > create(const std::string &method, const clustering_config &config)