jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
column_table.hpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2012,2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 #ifndef JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
18 #define JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
19 
20 #include <stdint.h>
21 #include <algorithm>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 #include <utility>
26 #include <msgpack.hpp>
27 
28 #include "jubatus/util/lang/cast.h"
29 #include "jubatus/util/lang/demangle.h"
30 #include "jubatus/util/data/unordered_map.h"
31 #include "jubatus/util/concurrent/rwmutex.h"
32 #include "jubatus/util/lang/shared_ptr.h"
33 #include "../common/assert.hpp"
34 #include "../common/exception.hpp"
35 #include "../common/unordered_map.hpp"
36 #include "../framework/packer.hpp"
37 #include "storage_exception.hpp"
38 #include "bit_vector.hpp"
39 #include "column_type.hpp"
40 #include "abstract_column.hpp"
41 #include "owner.hpp"
42 
43 namespace jubatus {
44 namespace core {
45 namespace storage {
46 
48  : public common::exception::jubaexception<invalid_row_set> {
49  const char* what() const throw() {
50  return "invalid_row_set";
51  }
52 };
53 
54 
55 class column_table {
56  typedef jubatus::util::data::unordered_map<std::string, uint64_t> index_table;
57 
58  public:
59  typedef std::pair<owner, uint64_t> version_t;
60 
62  : tuples_(0), clock_(0) {
63  }
65  }
66 
67  void init(const std::vector<column_type>& schema);
68  void clear();
69 
70  template<typename T1>
71  bool add(const std::string& key, const owner& o, const T1& v1) {
72  if (columns_.size() != 1) {
74  "tuple's length unmatch, expected " +
75  jubatus::util::lang::lexical_cast<std::string>(tuples_) + " tuples.");
76  }
77  // check already exists
78  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
79  index_table::const_iterator it = index_.find(key);
80  const bool not_found = it == index_.end();
81  if (not_found) {
82  // add tuple
83  keys_.push_back(key);
84  versions_.push_back(std::make_pair(o, clock_));
85  columns_[0].push_back(v1);
86  JUBATUS_ASSERT_EQ(keys_.size(), versions_.size(), "");
87 
88  // make index
89  index_.insert(std::make_pair(key, tuples_));
90  ++tuples_;
91  } else { // key exists
92  const uint64_t index = it->second;
93  versions_[index] = std::make_pair(o, clock_);
94  columns_[0].update(index, v1);
95  }
96  ++clock_;
97  return not_found;
98  }
99 
100  template<typename T1, typename T2>
101  bool add(const std::string& key, const owner& o, const T1& v1, const T2& v2) {
102  if (columns_.size() != 2) {
104  "tuple's length unmatch, expected " +
105  jubatus::util::lang::lexical_cast<std::string>(tuples_) + " tuples.");
106  }
107 
108  // check already exists */
109  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
110  index_table::const_iterator it = index_.find(key);
111  const bool not_found = it == index_.end();
112  if (not_found) {
113  // add tuple
114  keys_.push_back(key);
115  versions_.push_back(std::make_pair(o , clock_));
116  columns_[0].push_back(v1);
117  columns_[1].push_back(v2);
118  JUBATUS_ASSERT_EQ(keys_.size(), versions_.size(), "");
119 
120  // make index
121  index_.insert(std::make_pair(key, tuples_));
122  ++tuples_;
123  } else { // key exists
124  const uint64_t index = it->second;
125  versions_[index] = std::make_pair(o, clock_);
126  columns_[0].update(index, v1);
127  columns_[1].update(index, v2);
128  }
129  ++clock_;
130  return not_found;
131  }
132  // more add() will be needed...
133 
134  template<typename T>
135  bool update(
136  const std::string& key,
137  const owner& o,
138  size_t column_id,
139  const T& v) {
140  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
141  index_table::iterator it = index_.find(key);
142  if (tuples_ < column_id || it == index_.end()) {
143  return false;
144  }
145  versions_[it->second] = std::make_pair(o, clock_);
146  columns_[column_id].update(it->second, v);
147  columns_[column_id].update(it->second, v);
148  ++clock_;
149  return true;
150  }
151 
152  std::string get_key(uint64_t key_id) const {
153  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
154  return get_key_nolock(key_id);
155  }
156 
157  std::string get_key_nolock(uint64_t key_id) const {
158  if (tuples_ <= key_id) {
159  return "";
160  }
161  return keys_[key_id];
162  }
163 
164  void scan_clock() {
165  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
166  uint64_t max_clock = 0;
167  for (std::vector<version_t>::const_iterator it = versions_.begin();
168  it != versions_.end(); ++it) {
169  max_clock = std::max(max_clock, it->second);
170  }
171  clock_ = max_clock;
172  }
173 
174  /* get_column methods
175  ex. get_int8_column(), get_float_column(), get_bit_vector_column()...
176  argument is column_id
177  if type unmatched, it throws type_unmatch_exception
178  */
179  uint8_column& get_uint8_column(size_t column_id);
180  uint16_column& get_uint16_column(size_t column_id);
181  uint32_column& get_uint32_column(size_t column_id);
182  uint64_column& get_uint64_column(size_t column_id);
183  int8_column& get_int8_column(size_t column_id);
184  int16_column& get_int16_column(size_t column_id);
185  int32_column& get_int32_column(size_t column_id);
186  int64_column& get_int64_column(size_t column_id);
187  float_column& get_float_column(size_t column_id);
188  double_column& get_double_column(size_t column_id);
189  string_column& get_string_column(size_t column_id);
190  bit_vector_column& get_bit_vector_column(size_t column_id);
191 
192  const_uint8_column& get_uint8_column(size_t column_id) const;
193  const_uint16_column& get_uint16_column(size_t column_id) const;
194  const_uint32_column& get_uint32_column(size_t column_id) const;
195  const_uint64_column& get_uint64_column(size_t column_id) const;
196  const_int8_column& get_int8_column(size_t column_id) const;
197  const_int16_column& get_int16_column(size_t column_id) const;
198  const_int32_column& get_int32_column(size_t column_id) const;
199  const_int64_column& get_int64_column(size_t column_id) const;
200  const_float_column& get_float_column(size_t column_id) const;
201  const_double_column& get_double_column(size_t column_id) const;
202  const_string_column& get_string_column(size_t column_id) const;
203  const_bit_vector_column& get_bit_vector_column(size_t column_id) const;
204 
205  uint64_t size() const {
206  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
207  return tuples_;
208  }
209 
210  std::string dump_json() const {
211  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
212  std::stringstream ss;
213  ss << tuples_;
214  return ss.str();
215  }
216 
217  std::pair<bool, uint64_t> exact_match(const std::string& prefix) const;
218 
219  friend std::ostream& operator<<(std::ostream& os, const column_table& tbl) {
220  jubatus::util::concurrent::scoped_rlock lk(tbl.table_lock_);
221  os << "total size:" << tbl.tuples_ << std::endl;
222  os << "types: vesions|";
223  for (size_t j = 0; j < tbl.columns_.size(); ++j) {
224  os << tbl.columns_[j].type().type_as_string() << "\t|";
225  }
226  os << std::endl;
227  for (uint64_t i = 0; i < tbl.tuples_; ++i) {
228  os << tbl.keys_[i] << ":" <<
229  tbl.versions_[i].first << ":" << tbl.versions_[i].second << "\t|";
230  for (size_t j = 0; j < tbl.columns_.size(); ++j) {
231  tbl.columns_[j].dump(os, i);
232  os << "\t|";
233  }
234  os << std::endl;
235  }
236  return os;
237  }
238 
239  version_t get_version(uint64_t index) const {
240  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
241  return versions_[index];
242  }
243 
244  void get_row(const uint64_t id, framework::packer& pk) const {
245  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
246  JUBATUS_ASSERT_GE(tuples_, id, "specified index is bigger than table size");
247  pk.pack_array(3); // [key, [owner, id], [data]]
248  pk.pack(keys_[id]); // key
249  pk.pack(versions_[id]); // [version]
250  pk.pack_array(columns_.size());
251  for (size_t i = 0; i < columns_.size(); ++i) {
252  columns_[i].pack_with_index(id, pk);
253  }
254  }
255 
256  version_t set_row(const msgpack::object& o) {
257  if (o.type != msgpack::type::ARRAY || o.via.array.size != 3) {
258  throw msgpack::type_error();
259  }
260 
261  const std::string& key = o.via.array.ptr[0].as<std::string>();
262  version_t set_version = o.via.array.ptr[1].as<version_t>();
263 
264  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
265  const msgpack::object& dat = o.via.array.ptr[2];
266  index_table::iterator it = index_.find(key);
267  if (it == index_.end()) { // did not exist, append
268  if (dat.via.array.size != columns_.size()) {
269  throw std::bad_cast();
270  }
271 
272  // add tuple
273  keys_.push_back(key);
274  versions_.push_back(set_version);
275  for (size_t i = 0; i < columns_.size(); ++i) {
276  columns_[i].push_back(dat.via.array.ptr[i]);
277  }
278  JUBATUS_ASSERT_EQ(keys_.size(), versions_.size(), "");
279 
280  // make index
281  index_.insert(std::make_pair(key, tuples_));
282  ++tuples_;
283  } else { // already exist, overwrite if needed
284  const uint64_t target = it->second;
285 
286  if (dat.via.array.size != columns_.size()) {
287  throw std::bad_cast();
288  }
289 
290  // overwrite tuple if needed
291  if (versions_[target].second <= set_version.second) {
292  // needed!!
293  versions_[target] = set_version;
294  for (size_t i = 0; i < columns_.size(); ++i) {
295  columns_[i].update(target, dat.via.array.ptr[i]);
296  }
297  // make index
298  index_.insert(std::make_pair(key, tuples_));
299  }
300  }
301  if (clock_ <= set_version.second) {
302  clock_ = set_version.second + 1;
303  }
304  return set_version;
305  }
306 
307  bool update_clock(const std::string& target, const owner& o) {
308  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
309  index_table::const_iterator it = index_.find(target);
310  if (it == index_.end()) {
311  return false;
312  }
313  versions_[it->second] = std::make_pair(o, clock_);
314  ++clock_;
315  return true;
316  }
317 
318  bool update_clock(const uint64_t index, const owner& o) {
319  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
320  if (size() < index) {
321  return false;
322  }
323  versions_[index] = std::make_pair(o, clock_);
324  ++clock_;
325  return true;
326  }
327 
328  version_t get_clock(const std::string& target) const {
329  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
330  index_table::const_iterator it = index_.find(target);
331  if (it == index_.end()) {
332  return version_t();
333  }
334  return versions_[it->second];
335  }
336 
337  version_t get_clock(const uint64_t index) const {
338  jubatus::util::concurrent::scoped_rlock lk(table_lock_);
339  if (size() < index) {
340  return version_t();
341  }
342  return versions_[index];
343  }
344 
345  bool delete_row(const std::string& target) {
346  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
347  index_table::const_iterator it = index_.find(target);
348  if (it == index_.end()) {
349  return false;
350  }
351  delete_row_(it->second);
352  return true;
353  }
354 
355  bool delete_row(uint64_t index) {
356  jubatus::util::concurrent::scoped_wlock lk(table_lock_);
357  if (size() <= index) {
358  return false;
359  }
360  delete_row_(index);
361  return true;
362  }
363 
364  util::concurrent::rw_mutex& get_mutex() const {
365  return table_lock_;
366  }
367 
369 
371  packer.pack(*this);
372  }
373 
374  void unpack(msgpack::object o) {
375  o.convert(this);
376  }
377 
378  private:
379  std::vector<std::string> keys_;
380  std::vector<version_t> versions_;
381  std::vector<detail::abstract_column> columns_;
382  mutable jubatus::util::concurrent::rw_mutex table_lock_;
383  uint64_t tuples_;
384  uint64_t clock_;
385  index_table index_;
386 
387  void delete_row_(uint64_t index) {
388  JUBATUS_ASSERT_LT(index, size(), "");
389 
390  for (std::vector<detail::abstract_column>::iterator jt = columns_.begin();
391  jt != columns_.end();
392  ++jt) {
393  jt->remove(index);
394  }
395  { // needs swap on last index
396  index_table::iterator move_it = index_.find(keys_[tuples_ - 1]);
397  move_it->second = index;
398  index_.erase(keys_[index]);
399  }
400 
401  if (index + 1 != keys_.size()) {
402  std::swap(keys_[index], keys_.back());
403  }
404  keys_.pop_back();
405 
406  if (index + 1 != versions_.size()) {
407  std::swap(versions_[index], versions_.back());
408  }
409  versions_.pop_back();
410 
411  --tuples_;
412  ++clock_;
413 
414  JUBATUS_ASSERT_EQ(tuples_, index_.size(), "");
415  JUBATUS_ASSERT_EQ(tuples_, keys_.size(), "");
416  JUBATUS_ASSERT_EQ(tuples_, versions_.size(), "");
417  }
418 };
419 
420 } // namespace storage
421 } // namespcae core
422 } // namespace jubatus
423 
424 #endif // JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
uint64_column & get_uint64_column(size_t column_id)
#define JUBATUS_ASSERT_EQ(a, b, messages)
Definition: assert.hpp:63
float_column & get_float_column(size_t column_id)
uint8_column & get_uint8_column(size_t column_id)
int64_column & get_int64_column(size_t column_id)
int32_column & get_int32_column(size_t column_id)
version_t set_row(const msgpack::object &o)
bool delete_row(const std::string &target)
int16_column & get_int16_column(size_t column_id)
jubatus::util::data::unordered_map< std::string, uint64_t > index_table
std::pair< bool, uint64_t > exact_match(const std::string &prefix) const
uint32_column & get_uint32_column(size_t column_id)
std::string get_key_nolock(uint64_t key_id) const
version_t get_version(uint64_t index) const
void get_row(const uint64_t id, framework::packer &pk) const
void init(const std::vector< column_type > &schema)
string_column & get_string_column(size_t column_id)
version_t get_clock(const uint64_t index) const
#define JUBATUS_ASSERT_GE(a, b, messages)
Definition: assert.hpp:71
bit_vector_column & get_bit_vector_column(size_t column_id)
version_t get_clock(const std::string &target) const
std::vector< version_t > versions_
void swap(weighted_point &p1, weighted_point &p2)
Definition: types.hpp:47
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
bool update_clock(const uint64_t index, const owner &o)
std::vector< std::string > keys_
util::concurrent::rw_mutex & get_mutex() const
int8_column & get_int8_column(size_t column_id)
std::vector< T > v(size)
MSGPACK_DEFINE(keys_, tuples_, versions_, columns_, clock_, index_)
std::pair< owner, uint64_t > version_t
jubatus::util::concurrent::rw_mutex table_lock_
bool update_clock(const std::string &target, const owner &o)
bool update(const std::string &key, const owner &o, size_t column_id, const T &v)
friend std::ostream & operator<<(std::ostream &os, const column_table &tbl)
bool add(const std::string &key, const owner &o, const T1 &v1)
void pack(framework::packer &packer) const
double_column & get_double_column(size_t column_id)
std::vector< detail::abstract_column > columns_
uint16_column & get_uint16_column(size_t column_id)
std::string get_key(uint64_t key_id) const
#define JUBATUS_ASSERT_LT(a, b, messages)
Definition: assert.hpp:69
bool add(const std::string &key, const owner &o, const T1 &v1, const T2 &v2)