jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
Classes | Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
jubatus::core::burst::burst::impl_ Class Reference
Inheritance diagram for jubatus::core::burst::burst::impl_:
Inheritance graph
Collaboration diagram for jubatus::core::burst::burst::impl_:
Collaboration graph

Classes

class  aggregate_helper_
 
class  storage_
 

Public Member Functions

bool add_document (const string &str, double pos)
 
bool add_keyword (const string &keyword, const keyword_params &params, bool processed_in_this_server)
 
void calculate_results ()
 
void clear ()
 
result_map get_all_bursted_results () const
 
result_map get_all_bursted_results_at (double pos) const
 
keyword_list get_all_keywords () const
 
void get_diff (diff_t &ret) const
 
keyword_list get_processed_keywords () const
 
result_t get_result (const string &keyword) const
 
result_t get_result_at (const string &keyword, double pos) const
 
storage::version get_version () const
 
bool has_been_mixed () const
 
 impl_ (const burst_options &options)
 
void pack (framework::packer &packer) const
 
bool put_diff (const diff_t &diff)
 
bool remove_all_keywords ()
 
bool remove_keyword (const string &keyword)
 
void set_processed_keywords (const std::vector< string > &keywords)
 
void unpack (msgpack::object o)
 

Static Public Member Functions

template<class Map >
static keyword_list get_keyword_list (const Map &m)
 

Private Types

typedef unordered_map< string, aggregate_helper_aggregators_t
 
typedef unordered_map< string, storage_storages_t
 

Private Member Functions

const result_storageget_storage_ (const string &keyword) const
 

Static Private Member Functions

static void unpack_impl_ (msgpack::object o, burst_options &unpacked_options, aggregators_t &unpacked_aggregators, storages_t &unpacked_storages)
 

Private Attributes

aggregators_t aggregators_
 
bool has_been_mixed_
 
burst_options options_
 
storages_t storages_
 

Detailed Description

Definition at line 76 of file burst.cpp.

Member Typedef Documentation

Definition at line 173 of file burst.cpp.

typedef unordered_map<string, storage_> jubatus::core::burst::burst::impl_::storages_t
private

Definition at line 172 of file burst.cpp.

Constructor & Destructor Documentation

jubatus::core::burst::burst::impl_::impl_ ( const burst_options options)
inlineexplicit

Definition at line 176 of file burst.cpp.

References jubatus::core::burst::burst_options::batch_interval, jubatus::core::burst::burst_options::costcut_threshold, JUBATUS_EXCEPTION, jubatus::core::burst::burst_options::max_reuse_batch_num, options_, jubatus::core::burst::burst_options::result_window_rotate_size, and jubatus::core::burst::burst_options::window_batch_size.

177  : options_(options),
178  has_been_mixed_(false) {
179  if (!(options_.window_batch_size > 0)) {
180  throw JUBATUS_EXCEPTION(common::invalid_parameter(
181  "window_batch_size should > 0"));
182  }
183  if (!(options_.batch_interval > 0)) {
184  throw JUBATUS_EXCEPTION(common::invalid_parameter(
185  "batch_interval should > 0"));
186  }
187  if (!(options_.result_window_rotate_size > 0)) {
188  throw JUBATUS_EXCEPTION(common::invalid_parameter(
189  "result_window_rotate_size should > 0"));
190  }
191  if (!(options_.max_reuse_batch_num >= 0)) {
192  throw JUBATUS_EXCEPTION(common::invalid_parameter(
193  "max_reuse_batch_num should >= 0"));
194  }
195  if (!(options_.costcut_threshold > 0)) {
196  options_.costcut_threshold = DBL_MAX;
197  }
198  }
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79

Member Function Documentation

bool jubatus::core::burst::burst::impl_::add_document ( const string &  str,
double  pos 
)
inline

Definition at line 267 of file burst.cpp.

References jubatus::core::burst::burst::impl_::aggregate_helper_::add_document(), and aggregators_.

267  {
268  bool result = true;
269  for (aggregators_t::iterator iter = aggregators_.begin();
270  iter != aggregators_.end(); ++iter) {
271  const string& keyword = iter->first;
272  aggregate_helper_& a = iter->second;
273  int r = str.find(keyword) != str.npos ? 1 : 0;
274  result = a.add_document(1, r, pos) && result;
275  }
276  return result;
277  }

Here is the call graph for this function:

bool jubatus::core::burst::burst::impl_::add_keyword ( const string &  keyword,
const keyword_params params,
bool  processed_in_this_server 
)
inline

Definition at line 200 of file burst.cpp.

References aggregators_, jubatus::core::burst::keyword_params::gamma, JUBATUS_ASSERT_EQ, JUBATUS_EXCEPTION, options_, jubatus::core::burst::keyword_params::scaling_param, and storages_.

202  {
203  if (!(params.scaling_param > 1)) {
204  throw JUBATUS_EXCEPTION(
205  common::invalid_parameter("scaling_param must be > 1."));
206  }
207  if (!(params.gamma > 0)) {
208  throw JUBATUS_EXCEPTION(
209  common::invalid_parameter("gamma must be > 0."));
210  }
211 
212  if (storages_.count(keyword) > 0 || aggregators_.count(keyword) > 0) {
213  return false;
214  }
215 
216  std::pair<storages_t::iterator, bool> r =
217  storages_.insert(std::make_pair(keyword, storage_(options_, params)));
218 
219  JUBATUS_ASSERT_EQ(true, r.second, "");
220 
221  if (processed_in_this_server) {
222  aggregators_.insert(
223  std::make_pair(keyword,
224  aggregate_helper_(options_, r.first->second)));
225  }
226 
227  return true;
228  }
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
void jubatus::core::burst::burst::impl_::calculate_results ( )
inline

Definition at line 279 of file burst.cpp.

References aggregators_, and options_.

279  {
280  for (aggregators_t::iterator iter = aggregators_.begin();
281  iter != aggregators_.end(); ++iter) {
282  iter->second.calculate_result(options_);
283  }
284  }
void jubatus::core::burst::burst::impl_::clear ( )
inline

Definition at line 405 of file burst.cpp.

References aggregators_, and storages_.

Referenced by remove_all_keywords().

405  {
406  aggregators_t().swap(aggregators_);
407  storages_t().swap(storages_);
408  }
unordered_map< string, aggregate_helper_ > aggregators_t
Definition: burst.cpp:173
unordered_map< string, storage_ > storages_t
Definition: burst.cpp:172

Here is the caller graph for this function:

result_map jubatus::core::burst::burst::impl_::get_all_bursted_results ( ) const
inline

Definition at line 302 of file burst.cpp.

References aggregators_, jubatus::core::burst::result_storage::get_latest_result(), and jubatus::core::burst::burst_result::is_bursted_at_latest_batch().

302  {
303  result_map results;
304  for (aggregators_t::const_iterator iter = aggregators_.begin();
305  iter != aggregators_.end(); ++iter) {
306  const result_storage& s = *iter->second.get_storage();
307  result_t result = s.get_latest_result();
308  if (result.is_bursted_at_latest_batch()) {
309  results.insert(std::make_pair(iter->first, result));
310  }
311  }
312  return results;
313  }
burst_result result_t
Definition: burst.hpp:72
jubatus::util::data::unordered_map< std::string, result_t > result_map
Definition: burst.hpp:73

Here is the call graph for this function:

result_map jubatus::core::burst::burst::impl_::get_all_bursted_results_at ( double  pos) const
inline

Definition at line 315 of file burst.cpp.

References aggregators_, jubatus::core::burst::result_storage::get_result_at(), and jubatus::core::burst::burst_result::is_bursted_at().

315  {
316  result_map results;
317  for (aggregators_t::const_iterator iter = aggregators_.begin();
318  iter != aggregators_.end(); ++iter) {
319  const result_storage& s = *iter->second.get_storage();
320  result_t result = s.get_result_at(pos);
321  if (result.is_bursted_at(pos)) {
322  results.insert(std::make_pair(iter->first, result));
323  }
324  }
325  return results;
326  }
burst_result result_t
Definition: burst.hpp:72
jubatus::util::data::unordered_map< std::string, result_t > result_map
Definition: burst.hpp:73

Here is the call graph for this function:

keyword_list jubatus::core::burst::burst::impl_::get_all_keywords ( ) const
inline

Definition at line 260 of file burst.cpp.

References get_keyword_list(), and storages_.

260  {
261  return get_keyword_list(storages_);
262  }
static keyword_list get_keyword_list(const Map &m)
Definition: burst.cpp:241

Here is the call graph for this function:

void jubatus::core::burst::burst::impl_::get_diff ( diff_t ret) const
inline

Definition at line 328 of file burst.cpp.

References aggregators_, and jubatus::core::burst::burst::diff_t::p_.

328  {
329  shared_ptr<diff_t::impl_> diff(new diff_t::impl_());
330  diff_t::impl_::data_t& data = diff->data;
331 
332  for (aggregators_t::const_iterator iter = aggregators_.begin();
333  iter != aggregators_.end(); ++iter) {
334  const string& keyword = iter->first;
335  diff_t::impl_::entry_t entry = iter->second.get_diff();
336  data.insert(std::make_pair(keyword, entry));
337  }
338 
339  ret.p_ = diff;
340  }
unordered_map< string, entry_t > data_t
Definition: burst.cpp:48
template<class Map >
static keyword_list jubatus::core::burst::burst::impl_::get_keyword_list ( const Map &  m)
inlinestatic

Definition at line 241 of file burst.cpp.

References jubatus::core::burst::keyword_params::gamma, jubatus::core::burst::keyword_with_params::gamma, jubatus::core::burst::keyword_with_params::keyword, jubatus::core::burst::keyword_params::scaling_param, and jubatus::core::burst::keyword_with_params::scaling_param.

Referenced by get_all_keywords(), and get_processed_keywords().

241  {
242  keyword_list result;
243  result.reserve(m.size());
244 
245  for (typename Map::const_iterator iter = m.begin();
246  iter != m.end(); ++iter) {
247  const string& keyword = iter->first;
248  const keyword_params& params = iter->second.get_params();
249 
250  result.push_back(keyword_with_params());
251  keyword_with_params& x = result.back();
252  x.keyword = keyword;
253  x.scaling_param = params.scaling_param;
254  x.gamma = params.gamma;
255  }
256 
257  return result;
258  }
std::vector< keyword_with_params > keyword_list
Definition: burst.hpp:74

Here is the caller graph for this function:

keyword_list jubatus::core::burst::burst::impl_::get_processed_keywords ( ) const
inline

Definition at line 263 of file burst.cpp.

References aggregators_, and get_keyword_list().

263  {
265  }
static keyword_list get_keyword_list(const Map &m)
Definition: burst.cpp:241

Here is the call graph for this function:

result_t jubatus::core::burst::burst::impl_::get_result ( const string &  keyword) const
inline

Definition at line 286 of file burst.cpp.

References jubatus::core::burst::result_storage::get_latest_result(), and get_storage_().

286  {
287  const result_storage* s = get_storage_(keyword);
288  if (s == NULL) {
289  return result_t();
290  }
291  return s->get_latest_result();
292  }
burst_result result_t
Definition: burst.hpp:72
const result_storage * get_storage_(const string &keyword) const
Definition: burst.cpp:454

Here is the call graph for this function:

result_t jubatus::core::burst::burst::impl_::get_result_at ( const string &  keyword,
double  pos 
) const
inline

Definition at line 294 of file burst.cpp.

References jubatus::core::burst::result_storage::get_result_at(), and get_storage_().

294  {
295  const result_storage* s = get_storage_(keyword);
296  if (s == NULL) {
297  return result_t();
298  }
299  return s->get_result_at(pos);
300  }
burst_result result_t
Definition: burst.hpp:72
const result_storage * get_storage_(const string &keyword) const
Definition: burst.cpp:454

Here is the call graph for this function:

const result_storage* jubatus::core::burst::burst::impl_::get_storage_ ( const string &  keyword) const
inlineprivate

Definition at line 454 of file burst.cpp.

Referenced by get_result(), and get_result_at().

454  {
455  storages_t::const_iterator iter = storages_.find(keyword);
456  if (iter == storages_.end()) {
457  return NULL;
458  }
459  return iter->second.get_storage().get();
460  }

Here is the caller graph for this function:

storage::version jubatus::core::burst::burst::impl_::get_version ( ) const
inline

Definition at line 409 of file burst.cpp.

409  {
410  return storage::version();
411  }
bool jubatus::core::burst::burst::impl_::has_been_mixed ( ) const
inline

Definition at line 377 of file burst.cpp.

References has_been_mixed_.

377  {
378  return has_been_mixed_;
379  }
void jubatus::core::burst::burst::impl_::pack ( framework::packer packer) const
inline

Definition at line 413 of file burst.cpp.

References aggregators_, options_, and storages_.

413  {
414  packer.pack_array(3);
415 
416  packer.pack(options_);
417 
418  packer.pack_map(storages_.size());
419  for (storages_t::const_iterator iter = storages_.begin();
420  iter != storages_.end(); ++iter) {
421  packer.pack(iter->first);
422  packer.pack_array(2);
423  packer.pack(iter->second.get_params());
424  iter->second.get_storage()->pack(packer);
425  }
426 
427  packer.pack_map(aggregators_.size());
428  for (aggregators_t::const_iterator iter = aggregators_.begin();
429  iter != aggregators_.end(); ++iter) {
430  packer.pack(iter->first);
431  iter->second.get_aggregator()->pack(packer);
432  }
433  }
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
bool jubatus::core::burst::burst::impl_::put_diff ( const diff_t diff)
inline

Definition at line 341 of file burst.cpp.

References aggregators_, has_been_mixed_, options_, jubatus::core::burst::burst::diff_t::p_, and storages_.

341  {
342  const diff_t::impl_::data_t& data = diff.p_->data;
343 
344  for (diff_t::impl_::data_t::const_iterator iter = data.begin();
345  iter != data.end(); ++iter) {
346  const std::string& keyword = iter->first;
347  storages_t::iterator found = storages_.find(keyword);
348 
349  if (found == storages_.end()) {
350  const keyword_params params = iter->second.params;
351  std::pair<storages_t::iterator, bool> r =
352  storages_.insert(
353  std::make_pair(keyword, storage_(options_, params)));
354  found = r.first;
355  }
356 
357  found->second.put_diff(iter->second);
358  }
359 
360  std::vector<std::string> to_be_removed;
361 
362  for (aggregators_t::iterator iter = aggregators_.begin();
363  iter != aggregators_.end(); ++iter) {
364  iter->second.tick_removal_count();
365  if (iter->second.is_to_be_removed()) {
366  to_be_removed.push_back(iter->first);
367  }
368  }
369 
370  for (size_t i = 0; i < to_be_removed.size(); ++i) {
371  aggregators_.erase(to_be_removed[i]);
372  }
373 
374  has_been_mixed_ = true;
375  return true;
376  }
unordered_map< string, entry_t > data_t
Definition: burst.cpp:48
bool jubatus::core::burst::burst::impl_::remove_all_keywords ( )
inline

Definition at line 235 of file burst.cpp.

References clear().

235  {
236  clear();
237  return true;
238  }

Here is the call graph for this function:

bool jubatus::core::burst::burst::impl_::remove_keyword ( const string &  keyword)
inline

Definition at line 230 of file burst.cpp.

References aggregators_, and storages_.

230  {
231  aggregators_.erase(keyword);
232  return storages_.erase(keyword) > 0;
233  }
void jubatus::core::burst::burst::impl_::set_processed_keywords ( const std::vector< string > &  keywords)
inline

Definition at line 381 of file burst.cpp.

References aggregators_, JUBATUS_EXCEPTION, options_, and storages_.

381  {
382  for (aggregators_t::iterator iter = aggregators_.begin();
383  iter != aggregators_.end(); ++iter) {
384  iter->second.set_unprocessed();
385  }
386 
387  for (size_t i = 0; i < keywords.size(); ++i) {
388  aggregators_t::iterator found = aggregators_.find(keywords[i]);
389  if (found != aggregators_.end()) {
390  found->second.set_processed();
391  } else {
392  storages_t::iterator s = storages_.find(keywords[i]);
393  if (s == storages_.end()) {
394  throw JUBATUS_EXCEPTION(
395  common::exception::runtime_error(
396  "something went wrong in burst"));
397  }
398  aggregators_.insert(
399  std::make_pair(keywords[i],
400  aggregate_helper_(options_, s->second)));
401  }
402  }
403  }
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
void jubatus::core::burst::burst::impl_::unpack ( msgpack::object  o)
inline

Definition at line 435 of file burst.cpp.

References aggregators_, options_, storages_, and unpack_impl_().

435  {
436  burst_options unpacked_options = options_;
437  aggregators_t unpacked_aggregators;
438  storages_t unpacked_storages;
439 
440  unpack_impl_(o, unpacked_options, unpacked_aggregators, unpacked_storages);
441 
442  // assign
443  options_ = unpacked_options;
444  storages_.swap(unpacked_storages);
445  aggregators_.swap(unpacked_aggregators);
446  }
static void unpack_impl_(msgpack::object o, burst_options &unpacked_options, aggregators_t &unpacked_aggregators, storages_t &unpacked_storages)
Definition: burst.cpp:462
unordered_map< string, aggregate_helper_ > aggregators_t
Definition: burst.cpp:173
unordered_map< string, storage_ > storages_t
Definition: burst.cpp:172

Here is the call graph for this function:

static void jubatus::core::burst::burst::impl_::unpack_impl_ ( msgpack::object  o,
burst_options unpacked_options,
aggregators_t unpacked_aggregators,
storages_t unpacked_storages 
)
inlinestaticprivate

Definition at line 462 of file burst.cpp.

References jubatus::core::burst::burst::impl_::aggregate_helper_::get_aggregator(), and jubatus::core::burst::burst::impl_::storage_::get_storage().

Referenced by unpack().

465  {
466  if (o.type != msgpack::type::ARRAY || o.via.array.size != 3) {
467  throw msgpack::type_error();
468  }
469 
470  o.via.array.ptr[0].convert(&unpacked_options);
471 
472  {
473  const msgpack::object& m = o.via.array.ptr[1];
474  if (m.type != msgpack::type::MAP) {
475  throw msgpack::type_error();
476  }
477  size_t n = m.via.map.size;
478  for (size_t i = 0; i < n; ++i) {
479  string keyword;
480  m.via.map.ptr[i].key.convert(&keyword);
481 
482  std::pair<keyword_params, msgpack::object> val;
483  m.via.map.ptr[i].val.convert(&val);
484  storage_ s(unpacked_options, val.first);
485  s.get_storage()->unpack(val.second);
486 
487  unpacked_storages.insert(std::make_pair(keyword, s));
488  }
489  }
490 
491  {
492  const msgpack::object& m = o.via.array.ptr[2];
493  if (m.type != msgpack::type::MAP) {
494  throw msgpack::type_error();
495  }
496  size_t n = m.via.map.size;
497  for (size_t i = 0; i < n; ++i) {
498  string keyword;
499  m.via.map.ptr[i].key.convert(&keyword);
500 
501  storages_t::const_iterator iter = unpacked_storages.find(keyword);
502  if (iter == unpacked_storages.end()) {
503  throw msgpack::type_error();
504  }
505 
506  aggregate_helper_ a(unpacked_options, iter->second);
507  a.get_aggregator()->unpack(m.via.map.ptr[i].val);
508 
509  unpacked_aggregators.insert(std::make_pair(keyword, a));
510  }
511  }
512  }

Here is the call graph for this function:

Here is the caller graph for this function:

Member Data Documentation

aggregators_t jubatus::core::burst::burst::impl_::aggregators_
private
bool jubatus::core::burst::burst::impl_::has_been_mixed_
private

Definition at line 452 of file burst.cpp.

Referenced by has_been_mixed(), and put_diff().

burst_options jubatus::core::burst::burst::impl_::options_
private
storages_t jubatus::core::burst::burst::impl_::storages_
private

The documentation for this class was generated from the following file: