jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
burst.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2014 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "burst.hpp"
18 
19 #include <cfloat>
20 #include <utility>
21 #include <string>
22 #include <vector>
23 #include "jubatus/util/lang/noncopyable.h"
24 #include "jubatus/util/data/unordered_map.h"
25 
26 #include "../common/assert.hpp"
27 #include "../common/exception.hpp"
28 #include "../common/unordered_map.hpp"
29 #include "aggregator.hpp"
30 
31 using std::string;
32 using jubatus::util::lang::shared_ptr;
33 using jubatus::util::data::unordered_map;
34 
35 namespace jubatus {
36 namespace core {
37 namespace burst {
38 
40 
42  struct entry_t {
44  std::vector<burst_result> results;
45 
46  MSGPACK_DEFINE(params, results);
47  };
48  typedef unordered_map<string, entry_t> data_t;
49  data_t data;
50 
51  impl_() : data() {
52  }
53 
54  impl_(const impl_& x, const impl_& y) : data(x.data) {
55  for (data_t::const_iterator iter = y.data.begin();
56  iter != y.data.end(); ++iter) {
57  data_t::iterator found = data.find(iter->first);
58  if (found == data.end()) {
59  data.insert(*iter);
60  } else {
61  std::vector<burst_result>& results1 = found->second.results;
62  const std::vector<burst_result>& results2 = iter->second.results;
63  // simply merged; mixing is performed in put_diff
64  results1.insert(results1.end(), results2.begin(), results2.end());
65  }
66  }
67  }
68 
69  explicit impl_(msgpack::object o) : data() {
70  o.convert(this);
71  }
72 
73  MSGPACK_DEFINE(data);
74 };
75 
76 class burst::impl_ : jubatus::util::lang::noncopyable {
77  class storage_ {
78  public:
80  const burst_options& options, const keyword_params& params)
81  : s_(new result_storage(options.result_window_rotate_size)),
82  params_(params) {
83  }
84 
85  void put_diff(const diff_t::impl_::entry_t& diff) const {
86  s_->put_diff(diff.results);
87  }
88 
89  const shared_ptr<result_storage>& get_storage() const {
90  return s_;
91  }
92 
93  const keyword_params& get_params() const {
94  return params_;
95  }
96 
97  private:
98  shared_ptr<result_storage> s_;
100  };
101 
103  public:
105  const storage_& s)
106  : a_(new aggregator(options.window_batch_size,
107  options.batch_interval,
108  options.result_window_rotate_size)),
109  s_(s.get_storage()),
110  params_(s.get_params()),
111  removal_count_(-1) {
112  }
113 
114  bool add_document(int d, int r, double pos) const {
115  return a_->add_document(d, r, pos);
116  }
117 
118  void calculate_result(const burst_options& options) const {
119  a_->flush_results(params_.scaling_param,
120  params_.gamma,
121  options.costcut_threshold,
122  options.max_reuse_batch_num,
123  *s_);
124  }
125 
128  entry.params = params_;
129  entry.results = s_->get_diff();
130  return entry;
131  }
132 
133  const shared_ptr<aggregator>& get_aggregator() const {
134  return a_;
135  }
136 
137  const shared_ptr<result_storage>& get_storage() const {
138  return s_;
139  }
140 
141  const keyword_params& get_params() const {
142  return params_;
143  }
144 
146  if (removal_count_ < 0) {
148  }
149  }
150 
151  void set_processed() {
152  removal_count_ = -1;
153  }
154 
156  if (removal_count_ > 0) {
157  --removal_count_;
158  }
159  }
160 
161  bool is_to_be_removed() const {
162  return removal_count_ == 0;
163  }
164 
165  private:
166  shared_ptr<aggregator> a_;
167  shared_ptr<result_storage> s_;
170  };
171 
172  typedef unordered_map<string, storage_> storages_t;
173  typedef unordered_map<string, aggregate_helper_> aggregators_t;
174 
175  public:
176  explicit impl_(const burst_options& options)
177  : options_(options),
178  has_been_mixed_(false) {
179  if (!(options_.window_batch_size > 0)) {
181  "window_batch_size should > 0"));
182  }
183  if (!(options_.batch_interval > 0)) {
185  "batch_interval should > 0"));
186  }
187  if (!(options_.result_window_rotate_size > 0)) {
189  "result_window_rotate_size should > 0"));
190  }
191  if (!(options_.max_reuse_batch_num >= 0)) {
193  "max_reuse_batch_num should >= 0"));
194  }
195  if (!(options_.costcut_threshold > 0)) {
196  options_.costcut_threshold = DBL_MAX;
197  }
198  }
199 
200  bool add_keyword(const string& keyword,
201  const keyword_params& params,
202  bool processed_in_this_server) {
203  if (!(params.scaling_param > 1)) {
204  throw JUBATUS_EXCEPTION(
205  common::invalid_parameter("scaling_param must be > 1."));
206  }
207  if (!(params.gamma > 0)) {
208  throw JUBATUS_EXCEPTION(
209  common::invalid_parameter("gamma must be > 0."));
210  }
211 
212  if (storages_.count(keyword) > 0 || aggregators_.count(keyword) > 0) {
213  return false;
214  }
215 
216  std::pair<storages_t::iterator, bool> r =
217  storages_.insert(std::make_pair(keyword, storage_(options_, params)));
218 
219  JUBATUS_ASSERT_EQ(true, r.second, "");
220 
221  if (processed_in_this_server) {
222  aggregators_.insert(
223  std::make_pair(keyword,
224  aggregate_helper_(options_, r.first->second)));
225  }
226 
227  return true;
228  }
229 
230  bool remove_keyword(const string& keyword) {
231  aggregators_.erase(keyword);
232  return storages_.erase(keyword) > 0;
233  }
234 
236  clear();
237  return true;
238  }
239 
240  template<class Map>
241  static keyword_list get_keyword_list(const Map& m) {
242  keyword_list result;
243  result.reserve(m.size());
244 
245  for (typename Map::const_iterator iter = m.begin();
246  iter != m.end(); ++iter) {
247  const string& keyword = iter->first;
248  const keyword_params& params = iter->second.get_params();
249 
250  result.push_back(keyword_with_params());
251  keyword_with_params& x = result.back();
252  x.keyword = keyword;
253  x.scaling_param = params.scaling_param;
254  x.gamma = params.gamma;
255  }
256 
257  return result;
258  }
259 
261  return get_keyword_list(storages_);
262  }
265  }
266 
267  bool add_document(const string& str, double pos) {
268  bool result = true;
269  for (aggregators_t::iterator iter = aggregators_.begin();
270  iter != aggregators_.end(); ++iter) {
271  const string& keyword = iter->first;
272  aggregate_helper_& a = iter->second;
273  int r = str.find(keyword) != str.npos ? 1 : 0;
274  result = a.add_document(1, r, pos) && result;
275  }
276  return result;
277  }
278 
280  for (aggregators_t::iterator iter = aggregators_.begin();
281  iter != aggregators_.end(); ++iter) {
282  iter->second.calculate_result(options_);
283  }
284  }
285 
286  result_t get_result(const string& keyword) const {
287  const result_storage* s = get_storage_(keyword);
288  if (s == NULL) {
289  return result_t();
290  }
291  return s->get_latest_result();
292  }
293 
294  result_t get_result_at(const string& keyword, double pos) const {
295  const result_storage* s = get_storage_(keyword);
296  if (s == NULL) {
297  return result_t();
298  }
299  return s->get_result_at(pos);
300  }
301 
303  result_map results;
304  for (aggregators_t::const_iterator iter = aggregators_.begin();
305  iter != aggregators_.end(); ++iter) {
306  const result_storage& s = *iter->second.get_storage();
307  result_t result = s.get_latest_result();
308  if (result.is_bursted_at_latest_batch()) {
309  results.insert(std::make_pair(iter->first, result));
310  }
311  }
312  return results;
313  }
314 
316  result_map results;
317  for (aggregators_t::const_iterator iter = aggregators_.begin();
318  iter != aggregators_.end(); ++iter) {
319  const result_storage& s = *iter->second.get_storage();
320  result_t result = s.get_result_at(pos);
321  if (result.is_bursted_at(pos)) {
322  results.insert(std::make_pair(iter->first, result));
323  }
324  }
325  return results;
326  }
327 
328  void get_diff(diff_t& ret) const {
329  shared_ptr<diff_t::impl_> diff(new diff_t::impl_());
330  diff_t::impl_::data_t& data = diff->data;
331 
332  for (aggregators_t::const_iterator iter = aggregators_.begin();
333  iter != aggregators_.end(); ++iter) {
334  const string& keyword = iter->first;
335  diff_t::impl_::entry_t entry = iter->second.get_diff();
336  data.insert(std::make_pair(keyword, entry));
337  }
338 
339  ret.p_ = diff;
340  }
341  bool put_diff(const diff_t& diff) {
342  const diff_t::impl_::data_t& data = diff.p_->data;
343 
344  for (diff_t::impl_::data_t::const_iterator iter = data.begin();
345  iter != data.end(); ++iter) {
346  const std::string& keyword = iter->first;
347  storages_t::iterator found = storages_.find(keyword);
348 
349  if (found == storages_.end()) {
350  const keyword_params params = iter->second.params;
351  std::pair<storages_t::iterator, bool> r =
352  storages_.insert(
353  std::make_pair(keyword, storage_(options_, params)));
354  found = r.first;
355  }
356 
357  found->second.put_diff(iter->second);
358  }
359 
360  std::vector<std::string> to_be_removed;
361 
362  for (aggregators_t::iterator iter = aggregators_.begin();
363  iter != aggregators_.end(); ++iter) {
364  iter->second.tick_removal_count();
365  if (iter->second.is_to_be_removed()) {
366  to_be_removed.push_back(iter->first);
367  }
368  }
369 
370  for (size_t i = 0; i < to_be_removed.size(); ++i) {
371  aggregators_.erase(to_be_removed[i]);
372  }
373 
374  has_been_mixed_ = true;
375  return true;
376  }
377  bool has_been_mixed() const {
378  return has_been_mixed_;
379  }
380 
381  void set_processed_keywords(const std::vector<string>& keywords) {
382  for (aggregators_t::iterator iter = aggregators_.begin();
383  iter != aggregators_.end(); ++iter) {
384  iter->second.set_unprocessed();
385  }
386 
387  for (size_t i = 0; i < keywords.size(); ++i) {
388  aggregators_t::iterator found = aggregators_.find(keywords[i]);
389  if (found != aggregators_.end()) {
390  found->second.set_processed();
391  } else {
392  storages_t::iterator s = storages_.find(keywords[i]);
393  if (s == storages_.end()) {
394  throw JUBATUS_EXCEPTION(
396  "something went wrong in burst"));
397  }
398  aggregators_.insert(
399  std::make_pair(keywords[i],
400  aggregate_helper_(options_, s->second)));
401  }
402  }
403  }
404 
405  void clear() {
406  aggregators_t().swap(aggregators_);
407  storages_t().swap(storages_);
408  }
410  return storage::version();
411  }
412 
414  packer.pack_array(3);
415 
416  packer.pack(options_);
417 
418  packer.pack_map(storages_.size());
419  for (storages_t::const_iterator iter = storages_.begin();
420  iter != storages_.end(); ++iter) {
421  packer.pack(iter->first);
422  packer.pack_array(2);
423  packer.pack(iter->second.get_params());
424  iter->second.get_storage()->pack(packer);
425  }
426 
427  packer.pack_map(aggregators_.size());
428  for (aggregators_t::const_iterator iter = aggregators_.begin();
429  iter != aggregators_.end(); ++iter) {
430  packer.pack(iter->first);
431  iter->second.get_aggregator()->pack(packer);
432  }
433  }
434 
435  void unpack(msgpack::object o) {
436  burst_options unpacked_options = options_;
437  aggregators_t unpacked_aggregators;
438  storages_t unpacked_storages;
439 
440  unpack_impl_(o, unpacked_options, unpacked_aggregators, unpacked_storages);
441 
442  // assign
443  options_ = unpacked_options;
444  storages_.swap(unpacked_storages);
445  aggregators_.swap(unpacked_aggregators);
446  }
447 
448  private:
450  aggregators_t aggregators_;
451  storages_t storages_;
453 
454  const result_storage* get_storage_(const string& keyword) const {
455  storages_t::const_iterator iter = storages_.find(keyword);
456  if (iter == storages_.end()) {
457  return NULL;
458  }
459  return iter->second.get_storage().get();
460  }
461 
462  static void unpack_impl_(msgpack::object o,
463  burst_options& unpacked_options,
464  aggregators_t& unpacked_aggregators,
465  storages_t& unpacked_storages) {
466  if (o.type != msgpack::type::ARRAY || o.via.array.size != 3) {
467  throw msgpack::type_error();
468  }
469 
470  o.via.array.ptr[0].convert(&unpacked_options);
471 
472  {
473  const msgpack::object& m = o.via.array.ptr[1];
474  if (m.type != msgpack::type::MAP) {
475  throw msgpack::type_error();
476  }
477  size_t n = m.via.map.size;
478  for (size_t i = 0; i < n; ++i) {
479  string keyword;
480  m.via.map.ptr[i].key.convert(&keyword);
481 
482  std::pair<keyword_params, msgpack::object> val;
483  m.via.map.ptr[i].val.convert(&val);
484  storage_ s(unpacked_options, val.first);
485  s.get_storage()->unpack(val.second);
486 
487  unpacked_storages.insert(std::make_pair(keyword, s));
488  }
489  }
490 
491  {
492  const msgpack::object& m = o.via.array.ptr[2];
493  if (m.type != msgpack::type::MAP) {
494  throw msgpack::type_error();
495  }
496  size_t n = m.via.map.size;
497  for (size_t i = 0; i < n; ++i) {
498  string keyword;
499  m.via.map.ptr[i].key.convert(&keyword);
500 
501  storages_t::const_iterator iter = unpacked_storages.find(keyword);
502  if (iter == unpacked_storages.end()) {
503  throw msgpack::type_error();
504  }
505 
506  aggregate_helper_ a(unpacked_options, iter->second);
507  a.get_aggregator()->unpack(m.via.map.ptr[i].val);
508 
509  unpacked_aggregators.insert(std::make_pair(keyword, a));
510  }
511  }
512  }
513 };
514 
516  : p_(new impl_(options)) {
517 }
518 
519 burst::~burst() {
520 }
521 
522 bool burst::add_keyword(const string& keyword,
523  const keyword_params& params,
524  bool processed_in_this_server) {
526  return p_->add_keyword(keyword, params, processed_in_this_server);
527 }
528 
529 bool burst::remove_keyword(const string& keyword) {
531  return p_->remove_keyword(keyword);
532 }
533 
534 bool burst::remove_all_keywords() {
536  return p_->remove_all_keywords();
537 }
538 
539 burst::keyword_list burst::get_all_keywords() const {
541  return p_->get_all_keywords();
542 }
543 
544 burst::keyword_list burst::get_processed_keywords() const {
546  return p_->get_processed_keywords();
547 }
548 
549 bool burst::add_document(const string& str, double pos) {
551  return p_->add_document(str, pos);
552 }
553 
554 void burst::calculate_results() {
556  p_->calculate_results();
557 }
558 
559 burst::result_t burst::get_result(const std::string& keyword) const {
561  return p_->get_result(keyword);
562 }
563 burst::result_t burst::get_result_at(
564  const std::string& keyword, double pos) const {
566  return p_->get_result_at(keyword, pos);
567 }
568 burst::result_map burst::get_all_bursted_results() const {
570  return p_->get_all_bursted_results();
571 }
572 burst::result_map burst::get_all_bursted_results_at(double pos) const {
574  return p_->get_all_bursted_results_at(pos);
575 }
576 
577 void burst::set_processed_keywords(const std::vector<string>& keywords) {
579  return p_->set_processed_keywords(keywords);
580 }
581 
582 void burst::diff_t::mix(const diff_t& mixed) {
583  if (!p_) {
584  p_ = mixed.p_;
585  } else if (!mixed.p_) {
586  // do nothing
587  } else {
588  p_.reset(new impl_(*p_, *mixed.p_));
589  }
590 }
592  if (!p_) {
593  packer.pack_array(0);
594  } else {
595  p_->msgpack_pack(packer);
596  }
597 }
598 void burst::diff_t::msgpack_unpack(msgpack::object o) {
599  p_.reset(new impl_(o));
600 }
601 void burst::get_diff(diff_t& ret) const {
603  p_->get_diff(ret);
604 }
605 bool burst::put_diff(const diff_t& diff) {
607  return p_->put_diff(diff);
608 }
609 bool burst::has_been_mixed() const {
611  return p_->has_been_mixed();
612 }
613 
614 void burst::clear() {
616  p_->clear();
617 }
620  return p_->get_version();
621 }
624  p_->pack(packer);
625 }
626 void burst::unpack(msgpack::object o) {
628  p_->unpack(o);
629 }
630 
631 } // namespace burst
632 } // namespace core
633 } // namespace jubatus
result_t get_result_at(double pos) const
storage::version get_version() const
Definition: burst.cpp:618
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
storage_(const burst_options &options, const keyword_params &params)
Definition: burst.cpp:79
shared_ptr< result_storage > s_
Definition: burst.cpp:98
const shared_ptr< aggregator > & get_aggregator() const
Definition: burst.cpp:133
bool remove_keyword(const string &keyword)
Definition: burst.cpp:230
jubatus::util::lang::shared_ptr< const impl_ > p_
Definition: burst.hpp:100
impl_(const impl_ &x, const impl_ &y)
Definition: burst.cpp:54
burst_result result_t
Definition: burst.hpp:72
const shared_ptr< result_storage > & get_storage() const
Definition: burst.cpp:137
void set_processed_keywords(const std::vector< string > &keywords)
Definition: burst.cpp:381
int survival_mix_count_from_set_unprocessed
Definition: burst.cpp:39
jubatus::util::lang::scoped_ptr< impl_ > p_
Definition: burst.hpp:124
void pack(framework::packer &packer) const
Definition: burst.cpp:622
bool add_document(const string &str, double pos)
Definition: burst.cpp:267
void pack(framework::packer &packer) const
Definition: burst.cpp:413
bool is_bursted_at(double pos) const
keyword_list get_all_keywords() const
Definition: burst.cpp:260
void msgpack_pack(framework::packer &packer) const
Definition: burst.cpp:591
#define JUBATUS_ASSERT(expr)
Definition: assert.hpp:55
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
void put_diff(const diff_t::impl_::entry_t &diff) const
Definition: burst.cpp:85
bool add_document(int d, int r, double pos) const
Definition: burst.cpp:114
bool put_diff(const diff_t &diff)
Definition: burst.cpp:341
const result_storage * get_storage_(const string &keyword) const
Definition: burst.cpp:454
diff_t::impl_::entry_t get_diff() const
Definition: burst.cpp:126
result_map get_all_bursted_results() const
Definition: burst.cpp:302
void calculate_result(const burst_options &options) const
Definition: burst.cpp:118
void msgpack_unpack(msgpack::object o)
Definition: burst.cpp:598
impl_(const burst_options &options)
Definition: burst.cpp:176
keyword_list get_processed_keywords() const
Definition: burst.cpp:263
void unpack(msgpack::object o)
Definition: burst.cpp:626
bool has_been_mixed() const
Definition: burst.cpp:609
bool put_diff(const diff_t &)
Definition: burst.cpp:605
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
storage::version get_version() const
Definition: burst.cpp:409
static void unpack_impl_(msgpack::object o, burst_options &unpacked_options, aggregators_t &unpacked_aggregators, storages_t &unpacked_storages)
Definition: burst.cpp:462
static keyword_list get_keyword_list(const Map &m)
Definition: burst.cpp:241
result_t get_result(const string &keyword) const
Definition: burst.cpp:286
const keyword_params & get_params() const
Definition: burst.cpp:93
void get_diff(diff_t &ret) const
Definition: burst.cpp:601
unordered_map< string, aggregate_helper_ > aggregators_t
Definition: burst.cpp:173
result_map get_all_bursted_results_at(double pos) const
Definition: burst.cpp:315
unordered_map< string, entry_t > data_t
Definition: burst.cpp:48
const keyword_params & get_params() const
Definition: burst.cpp:141
const shared_ptr< result_storage > & get_storage() const
Definition: burst.cpp:89
bool add_keyword(const string &keyword, const keyword_params &params, bool processed_in_this_server)
Definition: burst.cpp:200
void unpack(msgpack::object o)
Definition: burst.cpp:435
void get_diff(diff_t &ret) const
Definition: burst.cpp:328
result_t get_result_at(const string &keyword, double pos) const
Definition: burst.cpp:294
jubatus::util::data::unordered_map< std::string, result_t > result_map
Definition: burst.hpp:73
aggregate_helper_(const burst_options &options, const storage_ &s)
Definition: burst.cpp:104
unordered_map< string, storage_ > storages_t
Definition: burst.cpp:172
std::vector< keyword_with_params > keyword_list
Definition: burst.hpp:74