jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
result_storage.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2014 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "result_storage.hpp"
18 
19 #include <stddef.h>
20 #include <cfloat>
21 #include <deque>
22 
23 #include "../common/assert.hpp"
24 
25 namespace jubatus {
26 namespace core {
27 namespace burst {
28 
30  typedef std::deque<result_t> results_t;
31  results_t results_;
32  size_t results_max_;
34 
35  public:
36  explicit impl_(int stored_results_max)
37  : results_max_(stored_results_max), oldest_start_pos_not_mixed_(DBL_MAX) {
38  }
39 
40  void store(const result_t& result, bool merge = false) {
41  double result_start_pos = result.get_start_pos();
42 
43  if (results_.empty()) {
44  results_.push_front(result);
45  } else {
46  typedef results_t::iterator iterator_t;
47  for (iterator_t iter = results_.begin(), end = results_.end();
48  iter != end; ++iter) {
49  if (iter->has_start_pos_older_than(result_start_pos)) {
50  results_.insert(iter, result);
51  break;
52  } else if (iter->has_same_start_pos_to(result_start_pos)) {
54  iter->get_batch_size(), result.get_batch_size(), "");
55  JUBATUS_ASSERT(iter->has_same_batch_interval(result));
56  if (merge) {
57  bool mixed = iter->mix(result);
58  JUBATUS_ASSERT(mixed);
59  } else {
60  *iter = result; // update with new result
61  }
62  break;
63  }
64  }
65  }
66 
67  if (result_start_pos < oldest_start_pos_not_mixed_) {
68  oldest_start_pos_not_mixed_ = result_start_pos;
69  }
70 
71  while (results_.size() > results_max_) {
72  results_.pop_back();
73  }
74  }
75 
77  if (results_.empty()) {
78  return result_t();
79  }
80  return results_.front();
81  }
82  result_t get_result_at(double pos) const {
83  typedef results_t::const_iterator iterator_t;
84  for (iterator_t iter = results_.begin(), end = results_.end();
85  iter != end; ++iter) {
86  if (iter->contains(pos)) {
87  return *iter;
88  }
89  }
90  return result_t();
91  }
92 
93  diff_t get_diff() const {
94  diff_t diff;
95 
96  typedef results_t::const_iterator iterator_t;
97  for (iterator_t iter = results_.begin(), end = results_.end();
98  iter != end; ++iter) {
99  if (iter->has_start_pos_older_than(oldest_start_pos_not_mixed_)) {
100  break;
101  }
102  diff.push_back(*iter);
103  }
104 
105  return diff;
106  }
107 
108  void put_diff(const diff_t& diff) {
109  // merge diff
110  for (diff_t::const_iterator iter = diff.begin();
111  iter != diff.end(); ++iter) {
112  store(*iter, true);
113  }
114 
115  // clear diff
116  oldest_start_pos_not_mixed_ = DBL_MAX;
117  }
118 
119  MSGPACK_DEFINE(results_, results_max_, oldest_start_pos_not_mixed_);
120 };
121 
122 result_storage::result_storage(int stored_results_max)
123  : p_(new impl_(stored_results_max)) {
124 }
125 
127 }
128 
129 void result_storage::store(const result_t& result) {
131  p_->store(result);
132 }
133 
136  return p_->get_latest_result();
137 }
138 
141  return p_->get_result_at(pos);
142 }
143 
146  return p_->get_diff();
147 }
148 
151  p_->put_diff(diff);
152 }
153 
156  packer.pack(*p_);
157 }
158 
159 void result_storage::unpack(msgpack::object o) {
161  o.convert(p_.get());
162 }
163 
164 } // namespace burst
165 } // namespace core
166 } // namespace jubatus
result_t get_result_at(double pos) const
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
void store(const result_t &result, bool merge=false)
MSGPACK_DEFINE(results_, results_max_, oldest_start_pos_not_mixed_)
jubatus::util::lang::scoped_ptr< impl_ > p_
#define JUBATUS_ASSERT(expr)
Definition: assert.hpp:55
void store(const result_t &result)
result_storage(int stored_results_max)
result_t get_result_at(double pos) const
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
void pack(framework::packer &packer) const