23 #include "../common/assert.hpp"
33 impl_(
int window_batch_size,
double batch_interval,
int max_stored)
40 typedef std::deque<input_window>::iterator iterator_t;
45 }
else if (
inputs_.front().get_end_pos() <= pos) {
48 inputs_.front().swap(new_window);
56 iter != end; ++iter) {
57 if (iter->add_document(d, r, pos)) {
68 double costcut_threshold,
69 int max_reuse_batches,
78 typedef std::deque<input_window>::reverse_iterator iterator_t;
81 iter != end; ++iter) {
82 burst_result r(*iter, scaling_param, gamma, costcut_threshold,
83 prev, max_reuse_batches);
93 std::pair<int, int> intersection =
95 if (intersection.first != intersection.second) {
116 int i =
static_cast<int>(
117 std::floor((pos - prev_start_pos) / batch_interval_));
118 int j = i - window_batch_size_/2;
119 double new_start_pos = prev_start_pos + batch_interval_ * j;
122 new_start_pos, batch_interval_, window_batch_size_);
126 for (
int i = 0, j = intersection.first;
127 j < intersection.second;
138 int window_batch_size,
double batch_interval,
int max_stored)
139 : p_(new
impl_(window_batch_size, batch_interval, max_stored)) {
147 return p_->add_document(d, r, pos);
152 double costcut_threshold,
153 int max_reuse_batches,
156 return p_->flush_results(scaling_param, gamma, costcut_threshold,
157 max_reuse_batches, stored);
result_t get_result_at(double pos) const
impl_(int window_batch_size, double batch_interval, int max_stored)
std::deque< input_window > inputs_
void unpack(msgpack::object o)
batch_type & get_batch_by_index(size_t i)
bool add_document(int d, int r, double pos)
jubatus::util::lang::scoped_ptr< impl_ > p_
int flush_results(double scaling_param, double gamma, double costcut_threshold, int max_reuse_batches, result_storage &stored)
#define JUBATUS_ASSERT(expr)
void store(const result_t &result)
#define JUBATUS_ASSERT_GT(a, b, messages)
void pack(framework::packer &packer) const
aggregator(int window_batch_size, double batch_interval, int max_stored)
input_window make_new_window_(double pos, const input_window &prev) const
std::pair< int, int > get_intersection(const W1 &w1, const W2 &w2)
msgpack::packer< jubatus_packer > packer
MSGPACK_DEFINE(inputs_, window_batch_size_, batch_interval_, max_stored_)
bool add_document(int d, int r, double pos)
double get_start_pos() const
void swap(burst_result &x)
#define JUBATUS_ASSERT_LT(a, b, messages)
int flush_results(double scaling_param, double gamma, double costcut_threshold, int max_reuse_batches, result_storage &stored)