17 #ifndef JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
18 #define JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
26 #include <msgpack.hpp>
28 #include "jubatus/util/lang/cast.h"
29 #include "jubatus/util/lang/demangle.h"
30 #include "jubatus/util/data/unordered_map.h"
31 #include "jubatus/util/concurrent/rwmutex.h"
32 #include "jubatus/util/lang/shared_ptr.h"
33 #include "../common/assert.hpp"
34 #include "../common/exception.hpp"
35 #include "../common/unordered_map.hpp"
36 #include "../framework/packer.hpp"
49 const char*
what()
const throw() {
50 return "invalid_row_set";
56 typedef jubatus::util::data::unordered_map<std::string, uint64_t>
index_table;
67 void init(
const std::vector<column_type>& schema);
71 bool add(
const std::string& key,
const owner& o,
const T1& v1) {
74 "tuple's length unmatch, expected " +
75 jubatus::util::lang::lexical_cast<std::string>(
tuples_) +
" tuples.");
78 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
79 index_table::const_iterator it =
index_.find(key);
80 const bool not_found = it ==
index_.end();
92 const uint64_t index = it->second;
100 template<
typename T1,
typename T2>
101 bool add(
const std::string& key,
const owner& o,
const T1& v1,
const T2& v2) {
104 "tuple's length unmatch, expected " +
105 jubatus::util::lang::lexical_cast<std::string>(
tuples_) +
" tuples.");
109 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
110 index_table::const_iterator it =
index_.find(key);
111 const bool not_found = it ==
index_.end();
114 keys_.push_back(key);
124 const uint64_t index = it->second;
136 const std::string& key,
140 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
141 index_table::iterator it =
index_.find(key);
146 columns_[column_id].update(it->second, v);
147 columns_[column_id].update(it->second, v);
153 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
161 return keys_[key_id];
165 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
166 uint64_t max_clock = 0;
167 for (std::vector<version_t>::const_iterator it =
versions_.begin();
169 max_clock = std::max(max_clock, it->second);
206 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
211 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
212 std::stringstream ss;
217 std::pair<bool, uint64_t>
exact_match(
const std::string& prefix)
const;
220 jubatus::util::concurrent::scoped_rlock lk(tbl.
table_lock_);
221 os <<
"total size:" << tbl.
tuples_ << std::endl;
222 os <<
"types: vesions|";
223 for (
size_t j = 0; j < tbl.
columns_.size(); ++j) {
224 os << tbl.
columns_[j].type().type_as_string() <<
"\t|";
227 for (uint64_t i = 0; i < tbl.
tuples_; ++i) {
228 os << tbl.
keys_[i] <<
":" <<
230 for (
size_t j = 0; j < tbl.
columns_.size(); ++j) {
240 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
245 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
251 for (
size_t i = 0; i <
columns_.size(); ++i) {
252 columns_[i].pack_with_index(
id, pk);
257 if (o.type != msgpack::type::ARRAY || o.via.array.size != 3) {
258 throw msgpack::type_error();
261 const std::string& key = o.via.array.ptr[0].as<std::string>();
262 version_t set_version = o.via.array.ptr[1].as<version_t>();
264 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
265 const msgpack::object& dat = o.via.array.ptr[2];
266 index_table::iterator it =
index_.find(key);
268 if (dat.via.array.size !=
columns_.size()) {
269 throw std::bad_cast();
273 keys_.push_back(key);
275 for (
size_t i = 0; i <
columns_.size(); ++i) {
276 columns_[i].push_back(dat.via.array.ptr[i]);
284 const uint64_t target = it->second;
286 if (dat.via.array.size !=
columns_.size()) {
287 throw std::bad_cast();
291 if (
versions_[target].second <= set_version.second) {
294 for (
size_t i = 0; i <
columns_.size(); ++i) {
295 columns_[i].update(target, dat.via.array.ptr[i]);
301 if (
clock_ <= set_version.second) {
302 clock_ = set_version.second + 1;
308 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
309 index_table::const_iterator it =
index_.find(target);
319 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
320 if (
size() < index) {
329 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
330 index_table::const_iterator it =
index_.find(target);
338 jubatus::util::concurrent::scoped_rlock lk(
table_lock_);
339 if (
size() < index) {
346 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
347 index_table::const_iterator it =
index_.find(target);
356 jubatus::util::concurrent::scoped_wlock lk(
table_lock_);
357 if (
size() <= index) {
390 for (std::vector<detail::abstract_column>::iterator jt = columns_.begin();
391 jt != columns_.end();
396 index_table::iterator move_it = index_.find(keys_[tuples_ - 1]);
397 move_it->second = index;
398 index_.erase(keys_[index]);
401 if (index + 1 != keys_.size()) {
406 if (index + 1 != versions_.size()) {
407 std::swap(versions_[index], versions_.back());
409 versions_.pop_back();
424 #endif // JUBATUS_CORE_STORAGE_COLUMN_TABLE_HPP_
uint64_column & get_uint64_column(size_t column_id)
#define JUBATUS_ASSERT_EQ(a, b, messages)
bool delete_row(uint64_t index)
float_column & get_float_column(size_t column_id)
void unpack(msgpack::object o)
uint8_column & get_uint8_column(size_t column_id)
int64_column & get_int64_column(size_t column_id)
int32_column & get_int32_column(size_t column_id)
void delete_row_(uint64_t index)
std::string dump_json() const
version_t set_row(const msgpack::object &o)
bool delete_row(const std::string &target)
int16_column & get_int16_column(size_t column_id)
jubatus::util::data::unordered_map< std::string, uint64_t > index_table
std::pair< bool, uint64_t > exact_match(const std::string &prefix) const
uint32_column & get_uint32_column(size_t column_id)
std::string get_key_nolock(uint64_t key_id) const
version_t get_version(uint64_t index) const
void get_row(const uint64_t id, framework::packer &pk) const
void init(const std::vector< column_type > &schema)
string_column & get_string_column(size_t column_id)
version_t get_clock(const uint64_t index) const
#define JUBATUS_ASSERT_GE(a, b, messages)
bit_vector_column & get_bit_vector_column(size_t column_id)
version_t get_clock(const std::string &target) const
std::vector< version_t > versions_
void swap(weighted_point &p1, weighted_point &p2)
msgpack::packer< jubatus_packer > packer
bool update_clock(const uint64_t index, const owner &o)
std::vector< std::string > keys_
util::concurrent::rw_mutex & get_mutex() const
int8_column & get_int8_column(size_t column_id)
MSGPACK_DEFINE(keys_, tuples_, versions_, columns_, clock_, index_)
std::pair< owner, uint64_t > version_t
jubatus::util::concurrent::rw_mutex table_lock_
bool update_clock(const std::string &target, const owner &o)
bool update(const std::string &key, const owner &o, size_t column_id, const T &v)
friend std::ostream & operator<<(std::ostream &os, const column_table &tbl)
bool add(const std::string &key, const owner &o, const T1 &v1)
void pack(framework::packer &packer) const
double_column & get_double_column(size_t column_id)
std::vector< detail::abstract_column > columns_
const char * what() const
uint16_column & get_uint16_column(size_t column_id)
std::string get_key(uint64_t key_id) const
#define JUBATUS_ASSERT_LT(a, b, messages)
bool add(const std::string &key, const owner &o, const T1 &v1, const T2 &v2)