jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
abstract_column.hpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2012,2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 
18 #ifndef JUBATUS_CORE_STORAGE_ABSTRACT_COLUMN_HPP_
19 #define JUBATUS_CORE_STORAGE_ABSTRACT_COLUMN_HPP_
20 
21 #include <algorithm>
22 #include <iterator>
23 #include <memory>
24 #include <string>
25 #include <vector>
26 #include <iosfwd>
27 #include <msgpack.hpp>
28 #include "jubatus/util/lang/demangle.h"
29 #include "jubatus/util/lang/noncopyable.h"
30 #include "../common/assert.hpp"
31 #include "../framework/packer.hpp"
32 #include "storage_exception.hpp"
33 #include "bit_vector.hpp"
34 #include "column_type.hpp"
35 
36 namespace jubatus {
37 namespace core {
38 namespace storage {
39 
40 namespace detail {
41 
42 class abstract_column_base : jubatus::util::lang::noncopyable {
43  public:
45  : my_type_(type) {
46  }
48  }
49 
50  column_type type() const {
51  return my_type_;
52  }
53 
54  #define JUBATUS_GEN_FUNCTIONS_(tp) \
55  virtual void push_back(const tp& value) { \
56  throw type_unmatch_exception( \
57  "column: invalid type in push_back(): " \
58  "expected: " + my_type_.type_as_string() + ", " \
59  "actual: " + jubatus::util::lang::get_typename<tp>()); \
60  } \
61  virtual bool insert(uint64_t target, const tp&) { \
62  throw type_unmatch_exception( \
63  "column: invalid type in insert(): " \
64  "expected: " + my_type_.type_as_string() + ", " \
65  "actual: " + jubatus::util::lang::get_typename<tp>()); \
66  } \
67  virtual bool update(uint64_t target, const tp&) { \
68  throw type_unmatch_exception( \
69  "column: invalid type in update(): " \
70  "expected: " + my_type_.type_as_string() + ", " \
71  "actual: " + jubatus::util::lang::get_typename<tp>()); \
72  }
73 
74  JUBATUS_GEN_FUNCTIONS_(uint8_t); // NOLINT
75  JUBATUS_GEN_FUNCTIONS_(uint16_t); // NOLINT
76  JUBATUS_GEN_FUNCTIONS_(uint32_t); // NOLINT
77  JUBATUS_GEN_FUNCTIONS_(uint64_t); // NOLINT
78  JUBATUS_GEN_FUNCTIONS_(int8_t); // NOLINT
79  JUBATUS_GEN_FUNCTIONS_(int16_t); // NOLINT
80  JUBATUS_GEN_FUNCTIONS_(int32_t); // NOLINT
81  JUBATUS_GEN_FUNCTIONS_(int64_t); // NOLINT
82  JUBATUS_GEN_FUNCTIONS_(float); // NOLINT
83  JUBATUS_GEN_FUNCTIONS_(double); // NOLINT
84  JUBATUS_GEN_FUNCTIONS_(std::string); // NOLINT
86  JUBATUS_GEN_FUNCTIONS_(msgpack::object); // NOLINT
87  #undef JUBATUS_GEN_FUNCTIONS_
88 
89  virtual bool remove(uint64_t target) = 0;
90  virtual void clear() = 0;
91  virtual void pack_with_index(
92  const uint64_t index, framework::packer& pk) const {
93  }
94  virtual void dump(std::ostream& os, uint64_t target) const = 0;
95 
96  private:
98 };
99 
100 } // namespace detail
101 
102 template <typename T>
104  public:
105  explicit typed_column(const column_type& type)
106  : detail::abstract_column_base(type) {
107  }
108 
109  using detail::abstract_column_base::push_back;
110  using detail::abstract_column_base::insert;
111  using detail::abstract_column_base::update;
112 
113  void push_back(const T& value) {
114  array_.push_back(value);
115  }
116  void push_back(const msgpack::object& obj) {
117  typed_column::push_back(obj.as<T>());
118  }
119 
120  bool insert(uint64_t target, const T& value) {
121  if (size() < target) {
122  return false;
123  }
124  array_.insert(array_.begin() + target, value);
125  return true;
126  }
127  bool insert(uint64_t target, const msgpack::object& obj) {
128  return typed_column::insert(target, obj.as<T>());
129  }
130 
131  bool update(uint64_t index, const T& value) {
132  if (size() <= index) {
133  return false;
134  }
135  array_[index] = value;
136  return true;
137  }
138  bool update(uint64_t target, const msgpack::object& obj) {
139  return typed_column::update(target, obj.as<T>());
140  }
141 
142  bool remove(uint64_t target) {
143  if (size() <= target) {
144  return false;
145  }
146  using std::swap;
147  swap(array_[target], array_.back());
148  array_.pop_back();
149  return true;
150  }
151  void clear() {
152  array_.clear();
153  }
154 
155  uint64_t size() const {
156  return array_.size();
157  }
158 
159  const T& operator[](uint64_t index) const {
160  if (size() <= index) {
162  "invalid index [" +
163  jubatus::util::lang::lexical_cast<std::string>(index) +
164  "] for [" +
165  jubatus::util::lang::lexical_cast<std::string>(array_.size()));
166  }
167  return array_[index];
168  }
169 
170  T& operator[](uint64_t index) {
171  if (size() <= index) {
173  "invalid index [" +
174  jubatus::util::lang::lexical_cast<std::string>(index) +
175  "] for [" +
176  jubatus::util::lang::lexical_cast<std::string>(array_.size()));
177  }
178  return array_[index];
179  }
180 
182  const uint64_t index, framework::packer& pk) const {
183  pk.pack((*this)[index]);
184  }
185 
186  friend std::ostream& operator<<(std::ostream& os,
187  const typed_column<T>& it) {
188  os << "[column (" << it.type().type_as_string() << ")"
189  << " size:" << it.size() << " {" << std::endl;
190  for (size_t i = 0; i < it.size(); ++i) {
191  it.dump(os, i);
192  }
193  os << "} ]" << std::endl;
194  return os;
195  }
196 
197  void dump(std::ostream& os, uint64_t target) const {
198  os << "[" << target << "] " << (*this)[target] << std::endl;
199  }
200 
201  template<class Buffer>
203  packer.pack(array_);
204  }
205  void unpack_array(msgpack::object o) {
206  o.convert(&array_);
207  }
208 
209  private:
210  std::vector<T> array_;
211 };
212 
213 template <>
215  public:
216  explicit typed_column(const column_type& type)
217  : detail::abstract_column_base(type) {
218  }
219 
220  using detail::abstract_column_base::push_back;
221  using detail::abstract_column_base::insert;
222  using detail::abstract_column_base::update;
223 
224  void push_back(const bit_vector& value) {
225  check_bit_vector_(value);
226  array_.resize(array_.size() + blocks_per_value_());
227  update_at_(size() - 1, value.raw_data_unsafe());
228  }
229  void push_back(const msgpack::object& obj) {
230  bit_vector value(type().bit_vector_length());
231  obj.convert(&value);
233  }
234 
235  bool insert(uint64_t target, const bit_vector& value) {
236  check_bit_vector_(value);
237 
238  if (size() < target) {
239  return false;
240  }
241  array_.insert(
242  array_.begin() + target * blocks_per_value_(),
243  blocks_per_value_(), 0);
244  update_at_(target, value.raw_data_unsafe());
245  return true;
246  }
247  bool insert(uint64_t target, const msgpack::object& obj) {
248  bit_vector value(type().bit_vector_length());
249  obj.convert(&value);
250  return typed_column::insert(target, value);
251  }
252 
253  bool update(uint64_t index, const bit_vector& value) {
254  check_bit_vector_(value);
255 
256  if (size() < index) {
257  return false;
258  }
259  update_at_(index, value.raw_data_unsafe());
260  return true;
261  }
262  bool update(uint64_t index, const msgpack::object& obj) {
263  bit_vector value(type().bit_vector_length());
264  obj.convert(&value);
265  return typed_column::update(index, value);
266  }
267 
268  uint64_t size() const {
269  JUBATUS_ASSERT_EQ(array_.size() % blocks_per_value_(), 0u, "");
270  return array_.size() / blocks_per_value_();
271  }
272  bit_vector operator[](uint64_t index) {
273  return bit_vector(get_data_at_(index), type().bit_vector_length());
274  }
275  bit_vector operator[](uint64_t index) const {
276  return bit_vector(get_data_at_(index), type().bit_vector_length());
277  }
278  bool remove(uint64_t target) {
279  if (target >= size()) {
280  return false;
281  }
282  if (target < size() - 1) {
283  const void* back = get_data_at_(size() - 1);
284  memcpy(get_data_at_(target), back, bytes_per_value_());
285  }
286  JUBATUS_ASSERT_GE(array_.size(), blocks_per_value_(), "");
287  array_.resize(array_.size() - blocks_per_value_());
288  return true;
289  }
290  void clear() {
291  array_.clear();
292  }
294  const uint64_t index, framework::packer& pk) const {
295  pk.pack((*this)[index]);
296  }
297 
298  friend std::ostream& operator<<(std::ostream& os,
299  const typed_column<bit_vector>& c) {
300  os << "[column (bit_vector)"
301  << " size:" << c.size() << " {" << std::endl;
302  for (size_t i = 0; i < c.size(); ++i) {
303  c.dump(os, i);
304  }
305  os << "} ]" << std::endl;
306  return os;
307  }
308 
309  void dump(std::ostream& os, uint64_t target) const {
310  os << "[" << target << "] " << (*this)[target] << std::endl;
311  }
312 
313  template<class Buffer>
315  packer.pack(array_);
316  }
317  void unpack_array(msgpack::object o) {
318  o.convert(&array_);
319  }
320 
321  private:
322  std::vector<uint64_t> array_;
323 
324  size_t bytes_per_value_() const {
325  return bit_vector::memory_size(type().bit_vector_length());
326  }
327  size_t blocks_per_value_() const {
328  return (bytes_per_value_() + sizeof(uint64_t) - 1) / sizeof(uint64_t);
329  }
330 
331  uint64_t* get_data_at_(size_t index) {
332  JUBATUS_ASSERT_LT(index, size(), "");
333  return &array_[blocks_per_value_() * index];
334  }
335  const uint64_t* get_data_at_(size_t index) const {
336  JUBATUS_ASSERT_LT(index, size(), "");
337  return &array_[blocks_per_value_() * index];
338  }
339 
340  void update_at_(size_t index, const void* raw_data) {
341  if (raw_data) {
342  memcpy(get_data_at_(index), raw_data, bytes_per_value_());
343  } else {
344  memset(get_data_at_(index), 0, bytes_per_value_());
345  }
346  }
347 
348  void check_bit_vector_(const bit_vector& tested) const {
349  const size_t bit_num_expected = type().bit_vector_length();
350  if (tested.bit_num() > bit_num_expected) {
352  "invalid length of bit_vector (" +
353  jubatus::util::lang::lexical_cast<std::string>(tested.bit_num()) + ", "
354  "expected " +
355  jubatus::util::lang::lexical_cast<std::string>(bit_num_expected) + ")");
356  }
357  }
358 };
359 
372 
384 typedef const bit_vector_column const_bit_vector_column;
385 
386 namespace detail {
387 
389  public:
391  }
392 
393  explicit abstract_column(const column_type& type) {
394  if (type.is(column_type::uint8_type)) {
395  base_.reset(new uint8_column(type));
396  } else if (type.is(column_type::uint16_type)) {
397  base_.reset(new uint16_column(type));
398  } else if (type.is(column_type::uint32_type)) {
399  base_.reset(new uint32_column(type));
400  } else if (type.is(column_type::uint64_type)) {
401  base_.reset(new uint64_column(type));
402  } else if (type.is(column_type::int8_type)) {
403  base_.reset(new int8_column(type));
404  } else if (type.is(column_type::int16_type)) {
405  base_.reset(new int16_column(type));
406  } else if (type.is(column_type::int32_type)) {
407  base_.reset(new int32_column(type));
408  } else if (type.is(column_type::int64_type)) {
409  base_.reset(new int64_column(type));
410  } else if (type.is(column_type::float_type)) {
411  base_.reset(new float_column(type));
412  } else if (type.is(column_type::double_type)) {
413  base_.reset(new double_column(type));
414  } else if (type.is(column_type::string_type)) {
415  base_.reset(new string_column(type));
416  } else if (type.is(column_type::bit_vector_type)) {
417  base_.reset(new bit_vector_column(type));
418  } else {
420  }
421  }
422 
423  column_type type() const {
424  JUBATUS_ASSERT(base_ != NULL);
425  return base_->type();
426  }
427 
428  template <typename T>
429  void push_back(const T& value) {
430  JUBATUS_ASSERT(base_ != NULL);
431  base_->push_back(value);
432  }
433 
434  template <typename T>
435  bool insert(uint64_t index, const T& value) {
436  JUBATUS_ASSERT(base_ != NULL);
437  return base_->insert(index, value);
438  }
439  template <typename T>
440  bool update(uint64_t index, const T& value) {
441  JUBATUS_ASSERT(base_ != NULL);
442  return base_->update(index, value);
443  }
444  bool remove(uint64_t index) {
445  JUBATUS_ASSERT(base_ != NULL);
446  return base_->remove(index);
447  };
448  void clear() {
449  JUBATUS_ASSERT(base_ != NULL);
450  base_->clear();
451  }
453  uint64_t index, framework::packer& pk) const {
454  JUBATUS_ASSERT(base_ != NULL);
455  base_->pack_with_index(index, pk);
456  }
458  return base_.get();
459  }
460  const abstract_column_base* get() const {
461  return base_.get();
462  }
463 
464  void dump(std::ostream& os, uint64_t target) const {
465  JUBATUS_ASSERT(base_ != NULL);
466  base_->dump(os, target);
467  }
468 
470  base_.swap(x.base_);
471  }
472  friend void swap(abstract_column& l, abstract_column& r) {
473  l.swap(r);
474  }
475 
476  template<class Buffer>
478  column_type type = base_->type();
479 
480  packer.pack_array(2);
481  packer.pack(type);
482 
483  if (type.is(column_type::uint8_type)) {
484  static_cast<const uint8_column&>(*base_).pack_array(packer);
485  } else if (type.is(column_type::uint16_type)) {
486  static_cast<const uint16_column&>(*base_).pack_array(packer);
487  } else if (type.is(column_type::uint32_type)) {
488  static_cast<const uint32_column&>(*base_).pack_array(packer);
489  } else if (type.is(column_type::uint64_type)) {
490  static_cast<const uint64_column&>(*base_).pack_array(packer);
491  } else if (type.is(column_type::int8_type)) {
492  static_cast<const int8_column&>(*base_).pack_array(packer);
493  } else if (type.is(column_type::int16_type)) {
494  static_cast<const int16_column&>(*base_).pack_array(packer);
495  } else if (type.is(column_type::int32_type)) {
496  static_cast<const int32_column&>(*base_).pack_array(packer);
497  } else if (type.is(column_type::int64_type)) {
498  static_cast<const int64_column&>(*base_).pack_array(packer);
499  } else if (type.is(column_type::float_type)) {
500  static_cast<const float_column&>(*base_).pack_array(packer);
501  } else if (type.is(column_type::double_type)) {
502  static_cast<const double_column&>(*base_).pack_array(packer);
503  } else if (type.is(column_type::string_type)) {
504  static_cast<const string_column&>(*base_).pack_array(packer);
505  } else if (type.is(column_type::bit_vector_type)) {
506  static_cast<const bit_vector_column&>(*base_).pack_array(packer);
507  } else {
509  }
510  }
511  void msgpack_unpack(msgpack::object o) {
512  if (o.type != msgpack::type::ARRAY || o.via.array.size != 2) {
513  throw msgpack::type_error();
514  }
515  msgpack::object* objs = o.via.array.ptr;
516 
518  objs[0].convert(&type);
519 
520  abstract_column tmp;
521  if (!base_) {
522  abstract_column(type).swap(tmp); // NOLINT
523  } else if (base_->type() == type) {
524  this->swap(tmp);
525  } else {
527  "column: invalid type in serialize(): "
528  "expected: " +
529  jubatus::util::lang::lexical_cast<std::string>(base_->type()) +
530  ", actual: " +
531  jubatus::util::lang::lexical_cast<std::string>(type));
532  }
533 
534  if (type.is(column_type::uint8_type)) {
535  static_cast<uint8_column&>(*tmp.base_).unpack_array(objs[1]);
536  } else if (type.is(column_type::uint16_type)) {
537  static_cast<uint16_column&>(*tmp.base_).unpack_array(objs[1]);
538  } else if (type.is(column_type::uint32_type)) {
539  static_cast<uint32_column&>(*tmp.base_).unpack_array(objs[1]);
540  } else if (type.is(column_type::uint64_type)) {
541  static_cast<uint64_column&>(*tmp.base_).unpack_array(objs[1]);
542  } else if (type.is(column_type::int8_type)) {
543  static_cast<int8_column&>(*tmp.base_).unpack_array(objs[1]);
544  } else if (type.is(column_type::int16_type)) {
545  static_cast<int16_column&>(*tmp.base_).unpack_array(objs[1]);
546  } else if (type.is(column_type::int32_type)) {
547  static_cast<int32_column&>(*tmp.base_).unpack_array(objs[1]);
548  } else if (type.is(column_type::int64_type)) {
549  static_cast<int64_column&>(*tmp.base_).unpack_array(objs[1]);
550  } else if (type.is(column_type::float_type)) {
551  static_cast<float_column&>(*tmp.base_).unpack_array(objs[1]);
552  } else if (type.is(column_type::double_type)) {
553  static_cast<double_column&>(*tmp.base_).unpack_array(objs[1]);
554  } else if (type.is(column_type::string_type)) {
555  static_cast<string_column&>(*tmp.base_).unpack_array(objs[1]);
556  } else if (type.is(column_type::bit_vector_type)) {
557  static_cast<bit_vector_column&>(*tmp.base_).unpack_array(objs[1]);
558  } else {
560  }
561 
562  this->swap(tmp);
563  }
564 
565  private:
566  jubatus::util::lang::shared_ptr<abstract_column_base> base_;
567 };
568 
569 } // namespace detail
570 
571 } // namespace storage
572 } // namespace core
573 } // namespace jubatus
574 
575 #endif // JUBATUS_CORE_STORAGE_ABSTRACT_COLUMN_HPP_
const typed_column< int16_t > const_int16_column
const typed_column< int64_t > const_int64_column
typed_column< std::string > string_column
const uint64_t * get_data_at_(size_t index) const
bool update(uint64_t index, const bit_vector &value)
void dump(std::ostream &os, uint64_t target) const
bool update(uint64_t index, const T &value)
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
#define JUBATUS_ASSERT_UNREACHABLE()
Definition: assert.hpp:59
void update_at_(size_t index, const void *raw_data)
typed_column< uint16_t > uint16_column
static size_t memory_size(size_t bit_width)
Definition: bit_vector.hpp:319
void pack_array(msgpack::packer< Buffer > &packer) const
typed_column< uint32_t > uint32_column
const typed_column< uint64_t > const_uint64_column
const typed_column< uint8_t > const_uint8_column
typed_column< float > float_column
typed_column< int32_t > int32_column
bool is(const type_name &type) const
Definition: column_type.hpp:66
bool update(uint64_t index, const T &value)
void dump(std::ostream &os, uint64_t target) const
virtual void dump(std::ostream &os, uint64_t target) const =0
bool insert(uint64_t index, const T &value)
const T & operator[](uint64_t index) const
typed_column< int64_t > int64_column
void msgpack_pack(msgpack::packer< Buffer > &packer) const
const bit_vector_column const_bit_vector_column
const typed_column< int8_t > const_int8_column
void pack_array(msgpack::packer< Buffer > &packer) const
bool insert(uint64_t target, const T &value)
#define JUBATUS_ASSERT(expr)
Definition: assert.hpp:55
bool update(uint64_t index, const msgpack::object &obj)
const typed_column< std::string > const_string_column
const typed_column< double > const_double_column
void pack_with_index(uint64_t index, framework::packer &pk) const
#define JUBATUS_ASSERT_GE(a, b, messages)
Definition: assert.hpp:71
bool insert(uint64_t target, const msgpack::object &obj)
typed_column< uint8_t > uint8_column
typed_column< uint64_t > uint64_column
const typed_column< uint16_t > const_uint16_column
typed_column(const column_type &type)
jubatus::util::lang::shared_ptr< abstract_column_base > base_
void swap(weighted_point &p1, weighted_point &p2)
Definition: types.hpp:47
const typed_column< uint32_t > const_uint32_column
void dump(std::ostream &os, uint64_t target) const
virtual void pack_with_index(const uint64_t index, framework::packer &pk) const
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
bit_vector_base< uint64_t > bit_vector
typed_column< double > double_column
void swap(lsh_vector &l, lsh_vector &r)
Definition: lsh_vector.hpp:61
vector< msgpack::object > objs
bool insert(uint64_t target, const msgpack::object &obj)
bool insert(uint64_t target, const bit_vector &value)
friend void swap(abstract_column &l, abstract_column &r)
void pack_with_index(const uint64_t index, framework::packer &pk) const
friend std::ostream & operator<<(std::ostream &os, const typed_column< bit_vector > &c)
typed_column< int16_t > int16_column
void push_back(const msgpack::object &obj)
typed_column< bit_vector > bit_vector_column
const typed_column< int32_t > const_int32_column
friend std::ostream & operator<<(std::ostream &os, const typed_column< T > &it)
void pack_with_index(const uint64_t index, framework::packer &pk) const
const typed_column< float > const_float_column
void check_bit_vector_(const bit_vector &tested) const
bool update(uint64_t target, const msgpack::object &obj)
typed_column< int8_t > int8_column
#define JUBATUS_ASSERT_LT(a, b, messages)
Definition: assert.hpp:69