jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
lof_storage.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2012 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #include "lof_storage.hpp"
18 
19 #include <algorithm>
20 #include <limits>
21 #include <sstream>
22 #include <stdexcept>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "anomaly_type.hpp"
28 #include "../common/exception.hpp"
29 #include "../common/jsonconfig.hpp"
30 #include "../recommender/euclid_lsh.hpp"
31 #include "../recommender/recommender_factory.hpp"
32 
33 using jubatus::util::data::unordered_map;
34 using jubatus::util::data::unordered_set;
35 using jubatus::util::lang::shared_ptr;
36 using std::istream;
37 using std::istringstream;
38 using std::max;
39 using std::min;
40 using std::numeric_limits;
41 using std::ostream;
42 using std::ostringstream;
43 using std::pair;
44 using std::string;
45 using std::vector;
46 
47 namespace jubatus {
48 namespace core {
49 namespace anomaly {
50 
51 const uint32_t lof_storage::DEFAULT_NEIGHBOR_NUM = 10;
52 const uint32_t lof_storage::DEFAULT_REVERSE_NN_NUM = 30;
53 
55  : nearest_neighbor_num(DEFAULT_NEIGHBOR_NUM),
56  reverse_nearest_neighbor_num(DEFAULT_REVERSE_NN_NUM) {
57 }
58 
62  nn_engine_(recommender::recommender_factory::create_recommender(
63  "euclid_lsh",
64  common::jsonconfig::config(jubatus::util::text::json::to_json(
65  recommender::euclid_lsh::config())), "")) {
66 }
67 
69  shared_ptr<recommender::recommender_base> nn_engine)
70  : neighbor_num_(DEFAULT_NEIGHBOR_NUM),
71  reverse_nn_num_(DEFAULT_REVERSE_NN_NUM),
72  nn_engine_(nn_engine) {
73 }
74 
76  const config& config,
77  shared_ptr<recommender::recommender_base> nn_engine)
78  : neighbor_num_(config.nearest_neighbor_num),
79  reverse_nn_num_(config.reverse_nearest_neighbor_num),
80  nn_engine_(nn_engine) {
81 }
82 
84 }
85 
90  const common::sfv_t& query,
91  unordered_map<string, float>& neighbor_lrd) const {
92  vector<pair<string, float> > neighbors;
93  nn_engine_->neighbor_row(query, neighbors, neighbor_num_);
94 
95  return collect_lrds_from_neighbors(neighbors, neighbor_lrd);
96 }
97 
103  const string& id,
104  unordered_map<string, float>& neighbor_lrd) const {
105  vector<pair<string, float> > neighbors;
106  nn_engine_->neighbor_row(id, neighbors, neighbor_num_ + 1);
107 
108  // neighbor_row returns id itself, so we remove it from the list
109  for (size_t i = 0; i < neighbors.size(); ++i) {
110  if (neighbors[i].first == id) {
111  swap(neighbors[i], neighbors.back());
112  neighbors.pop_back();
113  break;
114  }
115  }
116 
117  return collect_lrds_from_neighbors(neighbors, neighbor_lrd);
118 }
119 
120 void lof_storage::remove_row(const string& row) {
122  nn_engine_->clear_row(row);
123 }
124 
126  lof_table_t().swap(lof_table_);
128  nn_engine_->clear();
129 }
130 
131 void lof_storage::get_all_row_ids(vector<string>& ids) const {
132  nn_engine_->get_all_row_ids(ids);
133 }
134 
135 void lof_storage::update_row(const string& row, const common::sfv_t& diff) {
136  unordered_set<string> update_set;
137 
138  {
139  common::sfv_t query;
140  nn_engine_->decode_row(row, query);
141  if (!query.empty()) {
142  collect_neighbors(row, update_set);
143  }
144  }
145 
146  nn_engine_->update_row(row, diff);
147  collect_neighbors(row, update_set);
148 
149  update_set.insert(row);
150 
151  update_entries(update_set);
152 }
153 
154 string lof_storage::name() const {
155  return "lof_storage";
156 }
157 
158 float lof_storage::get_kdist(const string& row) const {
159  lof_table_t::const_iterator it = lof_table_diff_.find(row);
160  if (it == lof_table_diff_.end()) {
161  it = lof_table_.find(row);
162  if (it == lof_table_.end()) {
163  throw JUBATUS_EXCEPTION(
164  common::exception::runtime_error("specified row does not exist")
165  << common::exception::error_message("row id: " + row));
166  }
167  } else if (is_removed(it->second)) {
168  throw JUBATUS_EXCEPTION(
169  common::exception::runtime_error("specified row is recently removed")
170  << common::exception::error_message("row id: " + row));
171  }
172  return it->second.kdist;
173 }
174 
175 float lof_storage::get_lrd(const string& row) const {
176  lof_table_t::const_iterator it = lof_table_diff_.find(row);
177  if (it == lof_table_diff_.end()) {
178  it = lof_table_.find(row);
179  if (it == lof_table_.end()) {
180  throw JUBATUS_EXCEPTION(
181  common::exception::runtime_error("specified row does not exist")
182  << common::exception::error_message("row id: " + row));
183  }
184  } else if (is_removed(it->second)) {
185  throw JUBATUS_EXCEPTION(
186  common::exception::runtime_error("specified row is recently removed")
187  << common::exception::error_message("row id: " + row));
188  }
189  return it->second.lrd;
190 }
191 
192 bool lof_storage::has_row(const string& row) const {
193  return lof_table_diff_.count(row) > 0 || lof_table_.count(row) > 0;
194 }
195 
197  vector<string> ids;
198  get_all_row_ids(ids);
199 
200  // NOTE: These two loops are separated, since update_lrd requires new kdist
201  // values of k-NN.
202  for (size_t i = 0; i < ids.size(); ++i) {
203  update_kdist(ids[i]);
204  }
205  for (size_t i = 0; i < ids.size(); ++i) {
206  update_lrd(ids[i]);
207  }
208 }
209 
211  shared_ptr<recommender::recommender_base> nn_engine) {
212  nn_engine_ = nn_engine;
213 }
214 
216  diff = lof_table_diff_;
217 }
218 
219 bool lof_storage::put_diff(const lof_table_t& mixed_diff) {
220  for (lof_table_t::const_iterator it = mixed_diff.begin();
221  it != mixed_diff.end(); ++it) {
222  if (is_removed(it->second)) {
223  lof_table_.erase(it->first);
224  } else {
225  lof_table_[it->first] = it->second;
226  }
227  }
228  lof_table_diff_.clear();
229  return true;
230 }
231 
232 void lof_storage::mix(const lof_table_t& lhs, lof_table_t& rhs) const {
233  for (lof_table_t::const_iterator it = lhs.begin(); it != lhs.end(); ++it) {
234  if (is_removed(it->second)) {
235  mark_removed(rhs[it->first]);
236  } else {
237  rhs.insert(*it);
238  }
239  }
240 }
241 
242 // private
243 
244 // static
246  entry.kdist = -1;
247 }
248 
249 // static
250 bool lof_storage::is_removed(const lof_entry& entry) {
251  return entry.kdist < 0;
252 }
253 
261  const vector<pair<string, float> >& neighbors,
262  unordered_map<string, float>& neighbor_lrd) const {
263  if (neighbors.empty()) {
264  return numeric_limits<float>::infinity();
265  }
266 
267  // collect lrd values of the nearest neighbors
268  neighbor_lrd.clear();
269  for (size_t i = 0; i < neighbors.size(); ++i) {
270  neighbor_lrd[neighbors[i].first] = get_lrd(neighbors[i].first);
271  }
272 
273  // return lrd of the query
274  float sum_reachability = 0;
275  for (size_t i = 0; i < neighbors.size(); ++i) {
276  sum_reachability += max(neighbors[i].second, get_kdist(neighbors[i].first));
277  }
278 
279  // All the neighbors seem to have a same feature vector.
280  if (sum_reachability == 0) {
281  return numeric_limits<float>::infinity();
282  }
283 
284  return neighbors.size() / sum_reachability;
285 }
286 
291  const string& row,
292  unordered_set<string>& nn) const {
293  vector<pair<string, float> > neighbors;
294  nn_engine_->neighbor_row(row, neighbors, reverse_nn_num_);
295 
296  for (size_t i = 0; i < neighbors.size(); ++i) {
297  nn.insert(neighbors[i].first);
298  }
299 }
300 
304 void lof_storage::update_entries(const unordered_set<string>& rows) {
305  // NOTE: These two loops are separated, since update_lrd requires new kdist
306  // values of k-NN.
307  typedef unordered_map<string, vector<pair<string, float> > >
308  rows_to_neighbors_type;
309 
310  rows_to_neighbors_type rows_to_neighbors;
311  for (unordered_set<string>::const_iterator it = rows.begin();
312  it != rows.end(); ++it) {
313  nn_engine_->neighbor_row(*it, rows_to_neighbors[*it], neighbor_num_);
314  }
315 
316  for (rows_to_neighbors_type::const_iterator it = rows_to_neighbors.begin();
317  it != rows_to_neighbors.end(); ++it) {
318  update_kdist_with_neighbors(it->first, it->second);
319  }
320  for (rows_to_neighbors_type::const_iterator it = rows_to_neighbors.begin();
321  it != rows_to_neighbors.end(); ++it) {
322  update_lrd_with_neighbors(it->first, it->second);
323  }
324 }
325 
329 void lof_storage::update_kdist(const string& row) {
330  vector<pair<string, float> > neighbors;
331  nn_engine_->neighbor_row(row, neighbors, neighbor_num_);
332  update_kdist_with_neighbors(row, neighbors);
333 }
334 
340  const string& row,
341  const vector<pair<string, float> >& neighbors) {
342  if (!neighbors.empty()) {
343  lof_table_diff_[row].kdist = neighbors.back().second;
344  }
345 }
346 
350 void lof_storage::update_lrd(const string& row) {
351  vector<pair<string, float> > neighbors;
352  nn_engine_->neighbor_row(row, neighbors, neighbor_num_);
353  update_lrd_with_neighbors(row, neighbors);
354 }
355 
360  const string& row, const vector<pair<string, float> >& neighbors) {
361  if (neighbors.empty()) {
362  lof_table_diff_[row].lrd = 1;
363  return;
364  }
365 
366  const size_t length = min(neighbors.size(), size_t(neighbor_num_));
367  float sum_reachability = 0;
368  for (size_t i = 0; i < length; ++i) {
369  sum_reachability += max(neighbors[i].second, get_kdist(neighbors[i].first));
370  }
371 
372  if (sum_reachability == 0) {
373  lof_table_diff_[row].lrd = numeric_limits<float>::infinity();
374  return;
375  }
376 
377  lof_table_diff_[row].lrd = length / sum_reachability;
378 }
379 
380 } // namespace anomaly
381 } // namespace core
382 } // namespace jubatus
static bool is_removed(const lof_entry &entry)
float get_kdist(const std::string &row) const
void update_row(const std::string &row, const common::sfv_t &diff)
void mix(const lof_table_t &lhs, lof_table_t &rhs) const
float get_lrd(const std::string &row) const
static const uint32_t DEFAULT_NEIGHBOR_NUM
Definition: lof_storage.hpp:54
void update_kdist(const std::string &row)
int nearest_neighbor_num
static const uint32_t DEFAULT_REVERSE_NN_NUM
Definition: lof_storage.hpp:55
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
float kdist
Definition: lof_storage.hpp:44
jubatus::util::lang::shared_ptr< core::recommender::recommender_base > nn_engine_
void update_lrd_with_neighbors(const std::string &row, const std::vector< std::pair< std::string, float > > &neighbors)
void get_diff(lof_table_t &diff) const
void update_entries(const jubatus::util::data::unordered_set< std::string > &rows)
bool put_diff(const lof_table_t &mixed_diff)
void swap(weighted_point &p1, weighted_point &p2)
Definition: types.hpp:47
void get_all_row_ids(std::vector< std::string > &ids) const
void set_nn_engine(jubatus::util::lang::shared_ptr< core::recommender::recommender_base > nn_engine)
void collect_neighbors(const std::string &row, jubatus::util::data::unordered_set< std::string > &nn) const
static void mark_removed(lof_entry &entry)
float collect_lrds(const common::sfv_t &query, jubatus::util::data::unordered_map< std::string, float > &neighbor_lrd) const
std::vector< std::pair< std::string, float > > sfv_t
Definition: type.hpp:29
void remove_row(const std::string &row)
Definition: lof_storage.hpp:43
bool has_row(const std::string &row) const
void update_kdist_with_neighbors(const std::string &row, const std::vector< std::pair< std::string, float > > &neighbors)
void update_lrd(const std::string &row)
jubatus::util::data::unordered_map< std::string, lof_entry > lof_table_t
Definition: lof_storage.hpp:50
float collect_lrds_from_neighbors(const std::vector< std::pair< std::string, float > > &neighbors, jubatus::util::data::unordered_map< std::string, float > &neighbor_lrd) const