jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
aggregator.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2014 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "aggregator.hpp"
18 
19 #include <utility>
20 #include <deque>
21 #include <vector>
22 
23 #include "../common/assert.hpp"
24 #include "input_window.hpp"
25 #include "window_intersection.hpp"
26 
27 namespace jubatus {
28 namespace core {
29 namespace burst {
30 
32  public:
33  impl_(int window_batch_size, double batch_interval, int max_stored)
34  : window_batch_size_(window_batch_size),
35  batch_interval_(batch_interval),
36  max_stored_(max_stored) {
37  }
38 
39  bool add_document(int d, int r, double pos) {
40  typedef std::deque<input_window>::iterator iterator_t;
41 
42  if (inputs_.empty()) {
43  inputs_.push_front(
45  } else if (inputs_.front().get_end_pos() <= pos) {
46  input_window new_window = make_new_window_(pos, inputs_[0]);
47  inputs_.push_front(input_window());
48  inputs_.front().swap(new_window); // move semantics
49  while (inputs_.size() > static_cast<size_t>(max_stored_)) {
50  inputs_.pop_back();
51  }
52  }
53 
54  bool added = false;
55  for (iterator_t iter = inputs_.begin(), end = inputs_.end();
56  iter != end; ++iter) {
57  if (iter->add_document(d, r, pos)) {
58  added = true;
59  } else {
60  return added;
61  }
62  }
63  return added;
64  }
65 
66  int flush_results(double scaling_param,
67  double gamma,
68  double costcut_threshold,
69  int max_reuse_batches,
70  result_storage& stored) {
71  if (inputs_.empty()) {
72  return 0;
73  }
74 
75  burst_result prev = stored.get_result_at(
76  inputs_.back().get_start_pos() - batch_interval_/2);
77 
78  typedef std::deque<input_window>::reverse_iterator iterator_t;
79 
80  for (iterator_t iter = inputs_.rbegin(), end = inputs_.rend();
81  iter != end; ++iter) {
82  burst_result r(*iter, scaling_param, gamma, costcut_threshold,
83  prev, max_reuse_batches);
84  stored.store(r);
85  prev.swap(r); // more efficient than prev = r; use move when C++11/14
86  }
87 
88  // erase inputs which will no longer be modified by add_document
89  int n = 0;
90  for (;;) {
91  JUBATUS_ASSERT_GT(inputs_.size(), 0, "");
92 
93  std::pair<int, int> intersection =
94  get_intersection(inputs_.back(), inputs_.front());
95  if (intersection.first != intersection.second) {
96  break; // break if intersection exists
97  }
98 
99  inputs_.pop_back();
100  ++n;
101  }
102  // return erased count
103  return n;
104  }
105 
107 
108  private:
109  std::deque<input_window> inputs_;
113 
114  input_window make_new_window_(double pos, const input_window& prev) const {
115  double prev_start_pos = prev.get_start_pos();
116  int i = static_cast<int>(
117  std::floor((pos - prev_start_pos) / batch_interval_));
118  int j = i - window_batch_size_/2;
119  double new_start_pos = prev_start_pos + batch_interval_ * j;
120 
121  input_window new_window(
122  new_start_pos, batch_interval_, window_batch_size_);
123 
124  // fill new_window's d&r vector
125  std::pair<int, int> intersection = get_intersection(prev, new_window);
126  for (int i = 0, j = intersection.first;
127  j < intersection.second;
128  ++i, ++j) {
129  JUBATUS_ASSERT_LT(i, window_batch_size_, "");
130  new_window.get_batch_by_index(i) = prev.get_batch_by_index(j);
131  }
132 
133  return new_window; // NRVO
134  }
135 };
136 
138  int window_batch_size, double batch_interval, int max_stored)
139  : p_(new impl_(window_batch_size, batch_interval, max_stored)) {
140 }
141 
143 }
144 
145 bool aggregator::add_document(int d, int r, double pos) {
147  return p_->add_document(d, r, pos);
148 }
149 
150 int aggregator::flush_results(double scaling_param,
151  double gamma,
152  double costcut_threshold,
153  int max_reuse_batches,
154  result_storage& stored) {
156  return p_->flush_results(scaling_param, gamma, costcut_threshold,
157  max_reuse_batches, stored);
158 }
159 
162  packer.pack(*p_);
163 }
164 
165 void aggregator::unpack(msgpack::object o) {
167  o.convert(p_.get());
168 }
169 
170 } // namespace burst
171 } // namespace core
172 } // namespace jubatus
result_t get_result_at(double pos) const
impl_(int window_batch_size, double batch_interval, int max_stored)
Definition: aggregator.cpp:33
std::deque< input_window > inputs_
Definition: aggregator.cpp:109
void unpack(msgpack::object o)
Definition: aggregator.cpp:165
batch_type & get_batch_by_index(size_t i)
bool add_document(int d, int r, double pos)
Definition: aggregator.cpp:39
jubatus::util::lang::scoped_ptr< impl_ > p_
Definition: aggregator.hpp:52
int flush_results(double scaling_param, double gamma, double costcut_threshold, int max_reuse_batches, result_storage &stored)
Definition: aggregator.cpp:150
#define JUBATUS_ASSERT(expr)
Definition: assert.hpp:55
void store(const result_t &result)
#define JUBATUS_ASSERT_GT(a, b, messages)
Definition: assert.hpp:73
void pack(framework::packer &packer) const
Definition: aggregator.cpp:160
aggregator(int window_batch_size, double batch_interval, int max_stored)
Definition: aggregator.cpp:137
input_window make_new_window_(double pos, const input_window &prev) const
Definition: aggregator.cpp:114
std::pair< int, int > get_intersection(const W1 &w1, const W2 &w2)
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
MSGPACK_DEFINE(inputs_, window_batch_size_, batch_interval_, max_stored_)
bool add_document(int d, int r, double pos)
Definition: aggregator.cpp:145
#define JUBATUS_ASSERT_LT(a, b, messages)
Definition: assert.hpp:69
int flush_results(double scaling_param, double gamma, double costcut_threshold, int max_reuse_batches, result_storage &stored)
Definition: aggregator.cpp:66