jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
engine.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2014 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "engine.hpp"
18 
19 // TODO(gintenlabo): Refactor
20 
21 #include <math.h> // for ::lgamma (std::lgamma is not provided in C++03)
22 #include <cmath>
23 #include <cfloat>
24 #include <algorithm>
25 #include <numeric>
26 #include <utility>
27 #include <vector>
28 
29 #include "../common/exception.hpp"
30 
31 namespace jubatus {
32 namespace core {
33 namespace burst {
34 
35 namespace {
36 
37 // In enumerate burst detect, automaton has the following 2 states.
38 const int kStatesNum = 2;
39 // - base state
40 const int kBaseState = 0;
41 // - burst state
42 const int kBurstState = 1;
43 
44 const double kDefaultBatchWeight = -1;
45 
46 std::vector<double> get_p_vector(
47  const std::vector<uint32_t>& d_vector,
48  const std::vector<uint32_t>& r_vector,
49  double scaling_param) {
50  std::vector<double> ret(kStatesNum);
51  // D := \sum d
52  const int32_t D = std::accumulate(d_vector.begin(), d_vector.end(), 0);
53  // R := \sum r
54  const int32_t R = std::accumulate(r_vector.begin(), r_vector.end(), 0);
55  ret[kBaseState] = static_cast<double>(R) / static_cast<double>(D);
56  ret[kBurstState] = scaling_param * ret[kBaseState];
57  return ret;
58 }
59 
60 double tau(int i, int j, double gamma, int window_size) {
61  if (i >= j) {
62  return 0;
63  }
64  return (j - i) * gamma * std::log(window_size);
65 }
66 
67 double log_factorial(int i) {
68  return ::lgamma(i + 1);
69 }
70 
71 double log_choose(int n, int k) {
72  if (n < 0 || k < 0 || n < k) {
73  return -INFINITY;
74  }
75  return log_factorial(n) - log_factorial(k) - log_factorial(n - k);
76 }
77 
78 double sigma(double p, uint32_t d, uint32_t r) {
79  double ret = log_choose(d, r);
80  ret += r * std::log(p);
81  ret += (d - r) * std::log(1 - p);
82  return -ret;
83 }
84 
85 double get_batch_weight(
86  const std::vector<uint32_t>& d_vector,
87  const std::vector<uint32_t>& r_vector,
88  const std::vector<double>& p_vector,
89  int batch_id) {
90  double ret =
91  sigma(p_vector[kBaseState], d_vector[batch_id], r_vector[batch_id]) -
92  sigma(p_vector[kBurstState], d_vector[batch_id], r_vector[batch_id]);
93  return (ret > 0) ? ret : 0;
94 }
95 
96 std::pair<int, double> calc_previous_optimal_state(
97  int now_state,
98  double prev_base_optimal_cost,
99  double prev_burst_optimal_cost,
100  double gamma,
101  int window_size) {
102  // [previous] base state (optimal) -> [now] now_state
103  const double prev_base_optimal_to_now_state_cost
104  = prev_base_optimal_cost
105  + tau(kBaseState, now_state, gamma, window_size);
106  // [previous] burst state (optimal) -> [now] now_state
107  const double prev_burst_optimal_to_now_state_cost
108  = prev_burst_optimal_cost
109  + tau(kBurstState, now_state, gamma, window_size);
110 
111  int prev_optimal_state = kBaseState;
112  double prev_optimal_in_now_state_cost
113  = prev_base_optimal_to_now_state_cost;
114  if (prev_base_optimal_to_now_state_cost
115  > prev_burst_optimal_to_now_state_cost) {
116  prev_optimal_state = kBurstState;
117  prev_optimal_in_now_state_cost
118  = prev_burst_optimal_to_now_state_cost;
119  }
120 
121  return std::make_pair(prev_optimal_state,
122  prev_optimal_in_now_state_cost);
123 }
124 
125 bool check_branch_cuttable(
126  const std::vector<uint32_t>& d_vector,
127  const std::vector<uint32_t>& r_vector,
128  const std::vector<double> & p_vector,
129  int batch_id,
130  double burst_cut_threshold) {
131  const int window_size = d_vector.size();
132 
133  if (sigma(p_vector[kBurstState],
134  d_vector[batch_id],
135  r_vector[batch_id])
136  - sigma(p_vector[kBaseState],
137  d_vector[batch_id],
138  r_vector[batch_id])
139  > burst_cut_threshold * std::log(window_size)) {
140  return true;
141  }
142  return false;
143 }
144 
145 struct is_negative {
146  bool operator()(double x) const {
147  return x < 0;
148  }
149 };
150 
151 void erase_uncalc_batches(std::vector<double>& batch_weights) {
152  std::vector<double>::iterator iter = std::remove_if(
153  batch_weights.begin(), batch_weights.end(), is_negative());
154  batch_weights.erase(iter, batch_weights.end());
155 }
156 
157 } // namespace
158 
159 void burst_detect(const std::vector<uint32_t> & d_vector,
160  const std::vector<uint32_t> & r_vector,
161  std::vector<double>& batch_weights,
162  double scaling_param,
163  double gamma,
164  double burst_cut_threshold) {
165  const int window_size = d_vector.size();
166  if (gamma <= 0) {
167  throw JUBATUS_EXCEPTION(
168  common::invalid_parameter("gamma must be > 0."));
169  }
170  if (scaling_param <= 1) {
171  throw JUBATUS_EXCEPTION(
172  common::invalid_parameter("scaling_param must be > 1."));
173  }
174  if (burst_cut_threshold <= 0) {
175  throw JUBATUS_EXCEPTION(
176  common::invalid_parameter("burst_cut_threshold must be > 0."));
177  }
178  if (d_vector.size() != r_vector.size()) {
179  throw JUBATUS_EXCEPTION(
180  common::invalid_parameter("d_vector.size() != r_vector.size()"));
181  }
182  for (int batch_id = 0; batch_id < window_size; batch_id++) {
183  if (d_vector[batch_id] < r_vector[batch_id]) {
184  throw JUBATUS_EXCEPTION(
186  "d_vector[batch_id] < r_vector[batch_id]"));
187  }
188  }
189  const std::vector<double> p_vector
190  = get_p_vector(d_vector, r_vector, scaling_param);
191 
192  erase_uncalc_batches(batch_weights);
193 
194  // exception handling of
195  // - "p_{burst_state} > 1"
196  // - "p_{base_state} = 0"
197  if (1 < p_vector[kBurstState]) {
198  batch_weights.resize(window_size, INFINITY);
199  return;
200  } else if (p_vector[kBaseState] == 0) {
201  batch_weights.resize(window_size, 0);
202  return;
203  }
204 
205  const int reuse_batch_size = batch_weights.size();
206 
207  // the optimal costvals
208  // - index 0: [1st batch - prev batch] optimal seq -> [now] base
209  // - index 1: [1st batch - prev batch] optimal seq -> [now] burst
210  double prev_optimal_in_now_states_costs[] = {-1, -1};
211 
212  // the optimal costval from 1st batch to previuous batch.
213  // - index 0: previous : base
214  // - index 1: previous : burst
215  // To avoid "burst state in 1st batch",
216  // we must set to INFINITY the "cost val of burst state".
217  double prev_optimal_costs[] = {0, INFINITY};
218  if (batch_weights.size() != 0 && 0 < batch_weights.back()) {
219  // To avoid "base state in 1st batch",
220  // we must set to INFINITY the "cost val of base state".
221  prev_optimal_costs[kBaseState] = INFINITY;
222  prev_optimal_costs[kBurstState] = 0;
223  }
224 
225  // state sequences from 1st batch to now batch.
226  // - index 0: [1st batch - prev batch] optimal seq -> [now] base
227  // - index 1: [1st batch - prev batch] optimal seq -> [now] burst
228  std::vector<std::vector<int> > prev_optimal_in_now_states_seq(kStatesNum);
229 
230  // state sequences from 1st batch to previous batch.
231  // - index 0: [1st batch - prev batch] optimal seq & [prev] base
232  // - index 1: [1st batch - prev batch] optimal seq & [prev] burst
233  std::vector<std::vector<int> > prev_optimal_states_seq(kStatesNum);
234 
235  for (int update_batch_id = 0;
236  update_batch_id < window_size - reuse_batch_size;
237  update_batch_id++) {
238  for (int now_state = kBaseState; now_state < kStatesNum; now_state++) {
239  std::pair<int, double> prev_optimal_pair;
240 
241  if ((0 < update_batch_id + reuse_batch_size) &&
242  (d_vector[update_batch_id + reuse_batch_size - 1] == 0)) {
243  // exception handling
244  // in prev batch,
245  // (d, r) = (0, 0)
246  prev_optimal_pair.first = kBaseState;
247  prev_optimal_pair.second =
248  prev_optimal_costs[kBaseState] +
249  tau(kBaseState, now_state, gamma, window_size);
250  } else if (0 < update_batch_id + reuse_batch_size &&
251  check_branch_cuttable(d_vector, r_vector, p_vector,
252  update_batch_id + reuse_batch_size - 1,
253  burst_cut_threshold)) {
254  prev_optimal_pair.first = kBaseState;
255  prev_optimal_pair.second =
256  prev_optimal_costs[kBaseState] +
257  tau(kBaseState, now_state, gamma, window_size);
258  } else {
259  prev_optimal_pair =
260  calc_previous_optimal_state(now_state,
261  prev_optimal_costs[kBaseState],
262  prev_optimal_costs[kBurstState],
263  gamma, window_size);
264  }
265 
266  prev_optimal_in_now_states_costs[now_state] =
267  prev_optimal_pair.second +
268  sigma(p_vector[now_state],
269  d_vector[update_batch_id + reuse_batch_size],
270  r_vector[update_batch_id + reuse_batch_size]);
271 
272  prev_optimal_in_now_states_seq[now_state] =
273  prev_optimal_states_seq[prev_optimal_pair.first];
274  prev_optimal_in_now_states_seq[now_state].push_back(now_state);
275  }
276 
277  //
278  // ready for precessing the next batch.
279  //
280  for (int state = kBaseState; state < kStatesNum; state++) {
281  prev_optimal_costs[state] = prev_optimal_in_now_states_costs[state];
282  prev_optimal_states_seq[state] =
283  prev_optimal_in_now_states_seq[state];
284  }
285  }
286 
287  std::vector<int> optimal_states_seq;
288 
289  if (d_vector[window_size - 1] == 0) {
290  // exception handling
291  // in prev batch,
292  // (d, r) = (0, 0)
293  optimal_states_seq = prev_optimal_in_now_states_seq[kBaseState];
294  } else if (check_branch_cuttable(d_vector, r_vector, p_vector,
295  window_size - 1,
296  burst_cut_threshold)) {
297  optimal_states_seq = prev_optimal_in_now_states_seq[kBaseState];
298  } else {
299  optimal_states_seq =
300  prev_optimal_in_now_states_costs[kBaseState] <=
301  prev_optimal_in_now_states_costs[kBurstState] ?
302  prev_optimal_in_now_states_seq[kBaseState] :
303  prev_optimal_in_now_states_seq[kBurstState];
304  }
305 
306  //
307  // calculation of batch_weights
308  //
309 
310  // reuse of past results
311  for (int batch_id = 0; batch_id < reuse_batch_size; batch_id++) {
312  if (0 < batch_weights[batch_id]) {
313  batch_weights[batch_id] =
314  get_batch_weight(d_vector, r_vector, p_vector, batch_id);
315  }
316  }
317  // new calculation
318  for (int batch_id = reuse_batch_size; batch_id < window_size; batch_id++) {
319  int state = optimal_states_seq[batch_id - reuse_batch_size];
320  batch_weights.push_back(state == kBurstState ?
321  get_batch_weight(d_vector, r_vector,
322  p_vector, batch_id) :
323  0);
324  }
325 }
326 
327 } // namespace burst
328 } // namespace core
329 } // namespace jubatus
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
void burst_detect(const std::vector< uint32_t > &d_vector, const std::vector< uint32_t > &r_vector, std::vector< double > &batch_weights, double scaling_param, double gamma, double burst_cut_threshold)
Definition: engine.cpp:159