23 #include <msgpack.hpp>
24 #include "../common/exception.hpp"
25 #include "../common/byte_buffer.hpp"
26 #include "../framework/packer.hpp"
27 #include "../framework/stream_writer.hpp"
31 using jubatus::util::lang::shared_ptr;
42 struct internal_diff : framework::diff_object_raw {
44 msgpack::sbuffer data;
45 core::framework::stream_writer<msgpack::sbuffer> st(data);
46 core::framework::jubatus_packer jp(st);
50 pk.pack_raw(data.size());
51 pk.pack_raw_body(data.data(), data.size());
54 vector<msgpack::object>
objs;
55 vector<shared_ptr<msgpack::zone> >
zones;
61 const msgpack::object& obj)
const {
62 if (obj.type != msgpack::type::RAW) {
66 internal_diff* diff =
new internal_diff;
69 msgpack::unpacked msg;
70 msgpack::unpack(&msg, obj.via.raw.ptr, obj.via.raw.size);
72 msg.get().convert(&diff->objs);
73 if (!diff->objs.empty()) {
74 shared_ptr<msgpack::zone> owner(msg.zone().release());
75 diff->zones.push_back(owner);
81 msgpack::sbuffer data;
88 pk.pack_raw(data.size());
89 pk.pack_raw_body(data.data(), data.size());
93 internal_diff* diff_obj =
dynamic_cast<internal_diff*
>(ptr.get());
100 obj.type = msgpack::type::ARRAY;
101 obj.via.array.ptr = &diff_obj->objs[0];
102 obj.via.array.size = diff_obj->objs.size();
110 const msgpack::object& obj,
112 internal_diff* diff_obj =
dynamic_cast<internal_diff*
>(ptr.get());
118 if (obj.type != msgpack::type::RAW) {
122 msgpack::unpacked msg;
123 msgpack::unpack(&msg, obj.via.raw.ptr, obj.via.raw.size);
124 msgpack::object o = msg.get();
125 if (o.type != msgpack::type::ARRAY) {
129 for (
size_t i = 0; i < o.via.array.size; i++) {
130 diff_obj->objs.push_back(o.via.array.ptr[i]);
133 if (o.via.array.size > 0) {
134 shared_ptr<msgpack::zone> owner(msg.zone().release());
135 diff_obj->zones.push_back(owner);
144 const msgpack::object& arg,
159 const uint64_t table_size = table->size();
161 size_t pack_size = 0;
162 for (uint64_t i = 0; i < table_size; ++i) {
163 const version_t version = table->get_version(i);
164 version_clock::const_iterator it = vc.find(version.first);
165 if (it == vc.end() || it->second < version.second) {
169 pk.pack_array(pack_size);
171 for (uint64_t i = 0; i < table_size; ++i) {
172 const version_t version = table->get_version(i);
173 version_clock::const_iterator it = vc.find(version.first);
174 if (it == vc.end() || it->second < version.second) {
175 table->get_row(i, pk);
181 const msgpack::object& o) {
183 if (o.type != msgpack::type::ARRAY) {
187 for (uint64_t i = 0; i < o.via.array.size; ++i) {
188 const version_t version = table->set_row(o.via.array.ptr[i]);
194 version_clock::iterator it =
vc_.find(version.first);
195 if (it ==
vc_.end()) {
197 }
else if (it->second < version.second) {
198 it->second = version.second;
vector< shared_ptr< msgpack::zone > > zones
bool put_diff(const framework::diff_object &obj)
model_ptr get_model() const
void get_diff(framework::packer &) const
jubatus::util::lang::shared_ptr< diff_object_raw > diff_object
void pull_impl(const version_clock &vc, framework::packer &) const
void push_impl(const msgpack::object &)
#define JUBATUS_EXCEPTION(e)
void update_version(const storage::column_table::version_t &version)
void get_argument(framework::packer &pk) const
msgpack::packer< jubatus_packer > packer
vector< msgpack::object > objs
void pull(const msgpack::object &arg, framework::packer &pk) const
std::pair< owner, uint64_t > version_t
jubatus::core::storage::column_table::version_t version_t
jubatus::util::lang::shared_ptr< storage::column_table > model_ptr
framework::diff_object convert_diff_object(const msgpack::object &) const
void mix(const msgpack::object &obj, framework::diff_object) const
std::map< storage::owner, uint64_t > version_clock
void push(const msgpack::object &diff)