jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
Public Member Functions | Private Member Functions | Private Attributes | List of all members
jubatus::core::burst::aggregator::impl_ Class Reference
Collaboration diagram for jubatus::core::burst::aggregator::impl_:
Collaboration graph

Public Member Functions

bool add_document (int d, int r, double pos)
 
int flush_results (double scaling_param, double gamma, double costcut_threshold, int max_reuse_batches, result_storage &stored)
 
 impl_ (int window_batch_size, double batch_interval, int max_stored)
 
 MSGPACK_DEFINE (inputs_, window_batch_size_, batch_interval_, max_stored_)
 

Private Member Functions

input_window make_new_window_ (double pos, const input_window &prev) const
 

Private Attributes

double batch_interval_
 
std::deque< input_windowinputs_
 
int max_stored_
 
int window_batch_size_
 

Detailed Description

Definition at line 31 of file aggregator.cpp.

Constructor & Destructor Documentation

jubatus::core::burst::aggregator::impl_::impl_ ( int  window_batch_size,
double  batch_interval,
int  max_stored 
)
inline

Definition at line 33 of file aggregator.cpp.

34  : window_batch_size_(window_batch_size),
35  batch_interval_(batch_interval),
36  max_stored_(max_stored) {
37  }

Member Function Documentation

bool jubatus::core::burst::aggregator::impl_::add_document ( int  d,
int  r,
double  pos 
)
inline

Definition at line 39 of file aggregator.cpp.

References batch_interval_, inputs_, make_new_window_(), and max_stored_.

39  {
40  typedef std::deque<input_window>::iterator iterator_t;
41 
42  if (inputs_.empty()) {
43  inputs_.push_front(
44  make_new_window_(pos, input_window(0, batch_interval_, 0)));
45  } else if (inputs_.front().get_end_pos() <= pos) {
46  input_window new_window = make_new_window_(pos, inputs_[0]);
47  inputs_.push_front(input_window());
48  inputs_.front().swap(new_window); // move semantics
49  while (inputs_.size() > static_cast<size_t>(max_stored_)) {
50  inputs_.pop_back();
51  }
52  }
53 
54  bool added = false;
55  for (iterator_t iter = inputs_.begin(), end = inputs_.end();
56  iter != end; ++iter) {
57  if (iter->add_document(d, r, pos)) {
58  added = true;
59  } else {
60  return added;
61  }
62  }
63  return added;
64  }
std::deque< input_window > inputs_
Definition: aggregator.cpp:109
input_window make_new_window_(double pos, const input_window &prev) const
Definition: aggregator.cpp:114

Here is the call graph for this function:

int jubatus::core::burst::aggregator::impl_::flush_results ( double  scaling_param,
double  gamma,
double  costcut_threshold,
int  max_reuse_batches,
result_storage stored 
)
inline

Definition at line 66 of file aggregator.cpp.

References batch_interval_, jubatus::core::burst::get_intersection(), jubatus::core::burst::result_storage::get_result_at(), inputs_, JUBATUS_ASSERT_GT, jubatus::core::burst::result_storage::store(), and jubatus::core::burst::burst_result::swap().

70  {
71  if (inputs_.empty()) {
72  return 0;
73  }
74 
75  burst_result prev = stored.get_result_at(
76  inputs_.back().get_start_pos() - batch_interval_/2);
77 
78  typedef std::deque<input_window>::reverse_iterator iterator_t;
79 
80  for (iterator_t iter = inputs_.rbegin(), end = inputs_.rend();
81  iter != end; ++iter) {
82  burst_result r(*iter, scaling_param, gamma, costcut_threshold,
83  prev, max_reuse_batches);
84  stored.store(r);
85  prev.swap(r); // more efficient than prev = r; use move when C++11/14
86  }
87 
88  // erase inputs which will no longer be modified by add_document
89  int n = 0;
90  for (;;) {
91  JUBATUS_ASSERT_GT(inputs_.size(), 0, "");
92 
93  std::pair<int, int> intersection =
94  get_intersection(inputs_.back(), inputs_.front());
95  if (intersection.first != intersection.second) {
96  break; // break if intersection exists
97  }
98 
99  inputs_.pop_back();
100  ++n;
101  }
102  // return erased count
103  return n;
104  }
std::deque< input_window > inputs_
Definition: aggregator.cpp:109
#define JUBATUS_ASSERT_GT(a, b, messages)
Definition: assert.hpp:73
std::pair< int, int > get_intersection(const W1 &w1, const W2 &w2)

Here is the call graph for this function:

input_window jubatus::core::burst::aggregator::impl_::make_new_window_ ( double  pos,
const input_window prev 
) const
inlineprivate

Definition at line 114 of file aggregator.cpp.

References jubatus::core::burst::basic_window< Batch >::get_batch_by_index(), jubatus::core::burst::get_intersection(), jubatus::core::burst::basic_window< Batch >::get_start_pos(), and JUBATUS_ASSERT_LT.

Referenced by add_document().

114  {
115  double prev_start_pos = prev.get_start_pos();
116  int i = static_cast<int>(
117  std::floor((pos - prev_start_pos) / batch_interval_));
118  int j = i - window_batch_size_/2;
119  double new_start_pos = prev_start_pos + batch_interval_ * j;
120 
121  input_window new_window(
122  new_start_pos, batch_interval_, window_batch_size_);
123 
124  // fill new_window's d&r vector
125  std::pair<int, int> intersection = get_intersection(prev, new_window);
126  for (int i = 0, j = intersection.first;
127  j < intersection.second;
128  ++i, ++j) {
130  new_window.get_batch_by_index(i) = prev.get_batch_by_index(j);
131  }
132 
133  return new_window; // NRVO
134  }
std::pair< int, int > get_intersection(const W1 &w1, const W2 &w2)
#define JUBATUS_ASSERT_LT(a, b, messages)
Definition: assert.hpp:69

Here is the call graph for this function:

Here is the caller graph for this function:

jubatus::core::burst::aggregator::impl_::MSGPACK_DEFINE ( inputs_  ,
window_batch_size_  ,
batch_interval_  ,
max_stored_   
)

Member Data Documentation

double jubatus::core::burst::aggregator::impl_::batch_interval_
private

Definition at line 111 of file aggregator.cpp.

Referenced by add_document(), and flush_results().

std::deque<input_window> jubatus::core::burst::aggregator::impl_::inputs_
private

Definition at line 109 of file aggregator.cpp.

Referenced by add_document(), and flush_results().

int jubatus::core::burst::aggregator::impl_::max_stored_
private

Definition at line 112 of file aggregator.cpp.

Referenced by add_document().

int jubatus::core::burst::aggregator::impl_::window_batch_size_
private

Definition at line 110 of file aggregator.cpp.


The documentation for this class was generated from the following file: