jubatus_core  0.1.2
Jubatus: Online machine learning framework for distributed environment
mixable_versioned_table.cpp
Go to the documentation of this file.
1 // Jubatus: Online machine learning framework for distributed environment
2 // Copyright (C) 2013 Preferred Networks and Nippon Telegraph and Telephone Corporation.
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License version 2.1 as published by the Free Software Foundation.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
12 //
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 
18 
19 #include <algorithm>
20 #include <string>
21 #include <vector>
22 #include <memory>
23 #include <msgpack.hpp>
24 #include "../common/exception.hpp"
25 #include "../common/byte_buffer.hpp"
26 #include "../framework/packer.hpp"
27 #include "../framework/stream_writer.hpp"
28 
30 
31 using jubatus::util::lang::shared_ptr;
35 using std::vector;
36 
37 namespace jubatus {
38 namespace core {
39 namespace framework {
40 namespace {
41 
42 struct internal_diff : framework::diff_object_raw {
43  void convert_binary(framework::packer& pk) const {
44  msgpack::sbuffer data;
45  core::framework::stream_writer<msgpack::sbuffer> st(data);
46  core::framework::jubatus_packer jp(st);
48  p.pack(objs);
49 
50  pk.pack_raw(data.size());
51  pk.pack_raw_body(data.data(), data.size());
52  }
53 
54  vector<msgpack::object> objs;
55  vector<shared_ptr<msgpack::zone> > zones;
56 };
57 
58 } // namespace
59 
61  const msgpack::object& obj) const {
62  if (obj.type != msgpack::type::RAW) {
63  throw JUBATUS_EXCEPTION(
64  core::common::exception::runtime_error("bad diff_object"));
65  }
66  internal_diff* diff = new internal_diff;
67  diff_object diff_obj(diff);
68 
69  msgpack::unpacked msg;
70  msgpack::unpack(&msg, obj.via.raw.ptr, obj.via.raw.size);
71 
72  msg.get().convert(&diff->objs);
73  if (!diff->objs.empty()) {
74  shared_ptr<msgpack::zone> owner(msg.zone().release());
75  diff->zones.push_back(owner);
76  }
77  return diff_obj;
78 }
79 
81  msgpack::sbuffer data;
85  pull_impl(vc_, p);
86 
87  // Wrap msgpack binary more for holding msgpack::zone in internal diff_object.
88  pk.pack_raw(data.size());
89  pk.pack_raw_body(data.data(), data.size());
90 }
91 
93  internal_diff* diff_obj = dynamic_cast<internal_diff*>(ptr.get());
94  if (!diff_obj) {
95  throw JUBATUS_EXCEPTION(
96  core::common::exception::runtime_error("bad diff_object"));
97  }
98 
99  msgpack::object obj;
100  obj.type = msgpack::type::ARRAY;
101  obj.via.array.ptr = &diff_obj->objs[0];
102  obj.via.array.size = diff_obj->objs.size();
103 
104  push_impl(obj);
105 
106  return true;
107 }
108 
110  const msgpack::object& obj,
111  framework::diff_object ptr) const {
112  internal_diff* diff_obj = dynamic_cast<internal_diff*>(ptr.get());
113  if (!diff_obj) {
114  throw JUBATUS_EXCEPTION(
115  core::common::exception::runtime_error("bad diff_object"));
116  }
117 
118  if (obj.type != msgpack::type::RAW) {
119  throw JUBATUS_EXCEPTION(
120  core::common::exception::runtime_error("bad diff_object"));
121  }
122  msgpack::unpacked msg;
123  msgpack::unpack(&msg, obj.via.raw.ptr, obj.via.raw.size);
124  msgpack::object o = msg.get();
125  if (o.type != msgpack::type::ARRAY) {
126  throw JUBATUS_EXCEPTION(
127  core::common::exception::runtime_error("bad diff_object"));
128  }
129  for (size_t i = 0; i < o.via.array.size; i++) {
130  diff_obj->objs.push_back(o.via.array.ptr[i]);
131  }
132 
133  if (o.via.array.size > 0) {
134  shared_ptr<msgpack::zone> owner(msg.zone().release());
135  diff_obj->zones.push_back(owner);
136  }
137 }
138 
140  pk.pack(vc_);
141 }
142 
144  const msgpack::object& arg,
145  framework::packer& pk) const {
146  version_clock vc;
147  arg.convert(&vc);
148  pull_impl(vc, pk);
149 }
150 
151 void mixable_versioned_table::push(const msgpack::object& diff) {
152  push_impl(diff);
153 }
154 
156  const version_clock& vc, framework::packer& pk) const {
157 
158  model_ptr table = get_model();
159  const uint64_t table_size = table->size();
160 
161  size_t pack_size = 0;
162  for (uint64_t i = 0; i < table_size; ++i) {
163  const version_t version = table->get_version(i);
164  version_clock::const_iterator it = vc.find(version.first);
165  if (it == vc.end() || it->second < version.second) {
166  pack_size++;
167  }
168  }
169  pk.pack_array(pack_size);
170 
171  for (uint64_t i = 0; i < table_size; ++i) {
172  const version_t version = table->get_version(i);
173  version_clock::const_iterator it = vc.find(version.first);
174  if (it == vc.end() || it->second < version.second) {
175  table->get_row(i, pk);
176  }
177  }
178 }
179 
181  const msgpack::object& o) {
182  model_ptr table = get_model();
183  if (o.type != msgpack::type::ARRAY) {
184  throw JUBATUS_EXCEPTION(
185  core::common::exception::runtime_error("bad diff_object"));
186  }
187  for (uint64_t i = 0; i < o.via.array.size; ++i) {
188  const version_t version = table->set_row(o.via.array.ptr[i]);
189  update_version(version);
190  }
191 }
192 
194  version_clock::iterator it = vc_.find(version.first);
195  if (it == vc_.end()) {
196  vc_.insert(version);
197  } else if (it->second < version.second) {
198  it->second = version.second;
199  }
200 }
201 
202 } // namespace framework
203 } // namespace core
204 } // namespace jubatus
vector< shared_ptr< msgpack::zone > > zones
bool put_diff(const framework::diff_object &obj)
jubatus::util::lang::shared_ptr< diff_object_raw > diff_object
void pull_impl(const version_clock &vc, framework::packer &) const
#define JUBATUS_EXCEPTION(e)
Definition: exception.hpp:79
void update_version(const storage::column_table::version_t &version)
msgpack::packer< jubatus_packer > packer
Definition: bandit_base.hpp:31
vector< msgpack::object > objs
void pull(const msgpack::object &arg, framework::packer &pk) const
std::pair< owner, uint64_t > version_t
jubatus::core::storage::column_table::version_t version_t
jubatus::util::lang::shared_ptr< storage::column_table > model_ptr
framework::diff_object convert_diff_object(const msgpack::object &) const
void mix(const msgpack::object &obj, framework::diff_object) const
std::map< storage::owner, uint64_t > version_clock