23 #include "jubatus/util/lang/noncopyable.h"
24 #include "jubatus/util/data/unordered_map.h"
26 #include "../common/assert.hpp"
27 #include "../common/exception.hpp"
28 #include "../common/unordered_map.hpp"
32 using jubatus::util::lang::shared_ptr;
33 using jubatus::util::data::unordered_map;
48 typedef unordered_map<string, entry_t>
data_t;
55 for (data_t::const_iterator iter = y.
data.begin();
56 iter != y.
data.end(); ++iter) {
57 data_t::iterator found = data.find(iter->first);
58 if (found == data.end()) {
61 std::vector<burst_result>& results1 = found->second.results;
62 const std::vector<burst_result>& results2 = iter->second.results;
64 results1.insert(results1.end(), results2.begin(), results2.end());
69 explicit impl_(msgpack::object o) : data() {
98 shared_ptr<result_storage>
s_;
107 options.batch_interval,
108 options.result_window_rotate_size)),
115 return a_->add_document(d, r, pos);
166 shared_ptr<aggregator>
a_;
167 shared_ptr<result_storage>
s_;
181 "window_batch_size should > 0"));
185 "batch_interval should > 0"));
189 "result_window_rotate_size should > 0"));
193 "max_reuse_batch_num should >= 0"));
202 bool processed_in_this_server) {
207 if (!(params.
gamma > 0)) {
216 std::pair<storages_t::iterator, bool> r =
221 if (processed_in_this_server) {
223 std::make_pair(keyword,
243 result.reserve(m.size());
245 for (
typename Map::const_iterator iter = m.begin();
246 iter != m.end(); ++iter) {
247 const string& keyword = iter->first;
269 for (aggregators_t::iterator iter =
aggregators_.begin();
271 const string& keyword = iter->first;
273 int r = str.find(keyword) != str.npos ? 1 : 0;
280 for (aggregators_t::iterator iter =
aggregators_.begin();
282 iter->second.calculate_result(
options_);
304 for (aggregators_t::const_iterator iter =
aggregators_.begin();
309 results.insert(std::make_pair(iter->first, result));
317 for (aggregators_t::const_iterator iter =
aggregators_.begin();
322 results.insert(std::make_pair(iter->first, result));
332 for (aggregators_t::const_iterator iter =
aggregators_.begin();
334 const string& keyword = iter->first;
336 data.insert(std::make_pair(keyword, entry));
344 for (diff_t::impl_::data_t::const_iterator iter = data.begin();
345 iter != data.end(); ++iter) {
346 const std::string& keyword = iter->first;
347 storages_t::iterator found =
storages_.find(keyword);
351 std::pair<storages_t::iterator, bool> r =
357 found->second.put_diff(iter->second);
360 std::vector<std::string> to_be_removed;
362 for (aggregators_t::iterator iter =
aggregators_.begin();
364 iter->second.tick_removal_count();
365 if (iter->second.is_to_be_removed()) {
366 to_be_removed.push_back(iter->first);
370 for (
size_t i = 0; i < to_be_removed.size(); ++i) {
382 for (aggregators_t::iterator iter =
aggregators_.begin();
384 iter->second.set_unprocessed();
387 for (
size_t i = 0; i < keywords.size(); ++i) {
388 aggregators_t::iterator found =
aggregators_.find(keywords[i]);
390 found->second.set_processed();
392 storages_t::iterator s =
storages_.find(keywords[i]);
396 "something went wrong in burst"));
399 std::make_pair(keywords[i],
414 packer.pack_array(3);
419 for (storages_t::const_iterator iter =
storages_.begin();
421 packer.pack(iter->first);
422 packer.pack_array(2);
423 packer.pack(iter->second.get_params());
424 iter->second.get_storage()->pack(packer);
428 for (aggregators_t::const_iterator iter =
aggregators_.begin();
430 packer.pack(iter->first);
431 iter->second.get_aggregator()->pack(packer);
437 aggregators_t unpacked_aggregators;
438 storages_t unpacked_storages;
440 unpack_impl_(o, unpacked_options, unpacked_aggregators, unpacked_storages);
455 storages_t::const_iterator iter = storages_.find(keyword);
456 if (iter == storages_.end()) {
459 return iter->second.get_storage().get();
464 aggregators_t& unpacked_aggregators,
465 storages_t& unpacked_storages) {
466 if (o.type != msgpack::type::ARRAY || o.via.array.size != 3) {
467 throw msgpack::type_error();
470 o.via.array.ptr[0].convert(&unpacked_options);
473 const msgpack::object& m = o.via.array.ptr[1];
474 if (m.type != msgpack::type::MAP) {
475 throw msgpack::type_error();
477 size_t n = m.via.map.size;
478 for (
size_t i = 0; i < n; ++i) {
480 m.via.map.ptr[i].key.convert(&keyword);
482 std::pair<keyword_params, msgpack::object> val;
483 m.via.map.ptr[i].val.convert(&val);
484 storage_ s(unpacked_options, val.first);
487 unpacked_storages.insert(std::make_pair(keyword, s));
492 const msgpack::object& m = o.via.array.ptr[2];
493 if (m.type != msgpack::type::MAP) {
494 throw msgpack::type_error();
496 size_t n = m.via.map.size;
497 for (
size_t i = 0; i < n; ++i) {
499 m.via.map.ptr[i].key.convert(&keyword);
501 storages_t::const_iterator iter = unpacked_storages.find(keyword);
502 if (iter == unpacked_storages.end()) {
503 throw msgpack::type_error();
509 unpacked_aggregators.insert(std::make_pair(keyword, a));
516 : p_(new
impl_(options)) {
522 bool burst::add_keyword(
const string& keyword,
524 bool processed_in_this_server) {
526 return p_->add_keyword(keyword, params, processed_in_this_server);
529 bool burst::remove_keyword(
const string& keyword) {
531 return p_->remove_keyword(keyword);
534 bool burst::remove_all_keywords() {
536 return p_->remove_all_keywords();
541 return p_->get_all_keywords();
546 return p_->get_processed_keywords();
549 bool burst::add_document(
const string& str,
double pos) {
551 return p_->add_document(str, pos);
554 void burst::calculate_results() {
556 p_->calculate_results();
561 return p_->get_result(keyword);
564 const std::string& keyword,
double pos)
const {
566 return p_->get_result_at(keyword, pos);
570 return p_->get_all_bursted_results();
574 return p_->get_all_bursted_results_at(pos);
577 void burst::set_processed_keywords(
const std::vector<string>& keywords) {
579 return p_->set_processed_keywords(keywords);
582 void burst::diff_t::mix(
const diff_t& mixed) {
585 }
else if (!mixed.
p_) {
593 packer.pack_array(0);
595 p_->msgpack_pack(packer);
607 return p_->put_diff(diff);
611 return p_->has_been_mixed();
620 return p_->get_version();
result_t get_result_at(double pos) const
bool remove_all_keywords()
storage::version get_version() const
#define JUBATUS_ASSERT_EQ(a, b, messages)
storage_(const burst_options &options, const keyword_params ¶ms)
shared_ptr< result_storage > s_
const shared_ptr< aggregator > & get_aggregator() const
bool remove_keyword(const string &keyword)
jubatus::util::lang::shared_ptr< const impl_ > p_
impl_(const impl_ &x, const impl_ &y)
bool is_to_be_removed() const
const shared_ptr< result_storage > & get_storage() const
void set_processed_keywords(const std::vector< string > &keywords)
int survival_mix_count_from_set_unprocessed
jubatus::util::lang::scoped_ptr< impl_ > p_
result_t get_latest_result() const
void pack(framework::packer &packer) const
bool add_document(const string &str, double pos)
void pack(framework::packer &packer) const
bool is_bursted_at(double pos) const
keyword_list get_all_keywords() const
std::vector< burst_result > results
void msgpack_pack(framework::packer &packer) const
#define JUBATUS_ASSERT(expr)
#define JUBATUS_EXCEPTION(e)
void put_diff(const diff_t::impl_::entry_t &diff) const
bool add_document(int d, int r, double pos) const
aggregators_t aggregators_
shared_ptr< aggregator > a_
bool put_diff(const diff_t &diff)
const result_storage * get_storage_(const string &keyword) const
diff_t::impl_::entry_t get_diff() const
result_map get_all_bursted_results() const
void calculate_result(const burst_options &options) const
void msgpack_unpack(msgpack::object o)
int result_window_rotate_size
impl_(const burst_options &options)
keyword_list get_processed_keywords() const
bool is_bursted_at_latest_batch() const
void unpack(msgpack::object o)
bool has_been_mixed() const
bool put_diff(const diff_t &)
void tick_removal_count()
msgpack::packer< jubatus_packer > packer
storage::version get_version() const
static void unpack_impl_(msgpack::object o, burst_options &unpacked_options, aggregators_t &unpacked_aggregators, storages_t &unpacked_storages)
static keyword_list get_keyword_list(const Map &m)
result_t get_result(const string &keyword) const
const keyword_params & get_params() const
void get_diff(diff_t &ret) const
bool has_been_mixed() const
unordered_map< string, aggregate_helper_ > aggregators_t
result_map get_all_bursted_results_at(double pos) const
unordered_map< string, entry_t > data_t
const keyword_params & get_params() const
const shared_ptr< result_storage > & get_storage() const
bool add_keyword(const string &keyword, const keyword_params ¶ms, bool processed_in_this_server)
void unpack(msgpack::object o)
void get_diff(diff_t &ret) const
result_t get_result_at(const string &keyword, double pos) const
jubatus::util::data::unordered_map< std::string, result_t > result_map
MSGPACK_DEFINE(params, results)
aggregate_helper_(const burst_options &options, const storage_ &s)
unordered_map< string, storage_ > storages_t
std::vector< keyword_with_params > keyword_list
shared_ptr< result_storage > s_