00001
00002
00003
00004
00005
00006
00007
00008
00009 #ifndef SAGA_COMM_STREAM_STREAM_HPP
00010 #define SAGA_COMM_STREAM_STREAM_HPP
00011
00012
00013 #include <string>
00014 #include <vector>
00015
00016
00017 #include <saga/saga/util.hpp>
00018 #include <saga/saga/call.hpp>
00019 #include <saga/saga/base.hpp>
00020 #include <saga/saga/session.hpp>
00021 #include <saga/saga/task.hpp>
00022 #include <saga/saga/buffer.hpp>
00023 #include <saga/saga/url.hpp>
00024 #include <saga/saga/context.hpp>
00025
00026 #include <saga/saga/detail/attribute.hpp>
00027 #include <saga/saga/detail/monitorable.hpp>
00028
00029
00030 #include <saga/saga/packages/stream/config.hpp>
00031
00032
00033 #if defined(BOOST_MSVC)
00034 #pragma warning(push)
00035 #pragma warning(disable: 4251 4231 4660)
00036 #endif
00037
00038 namespace saga
00039 {
00042 namespace stream {
00043
00046 namespace attributes
00047 {
00048
00049 char const* const stream_bufsize = "Bufsize";
00050 char const* const stream_timeout = "Timeout";
00051 char const* const stream_blocking = "Blocking";
00052 char const* const stream_compression = "Compression";
00053 char const* const stream_nodelay = "Nodelay";
00054 char const* const stream_reliable = "Reliable";
00055 }
00056
00060 namespace metrics
00061 {
00062 char const* const stream_state = "stream.State";
00063 char const* const stream_read = "stream.Read";
00064 char const* const stream_write = "stream.Write";
00065 char const* const stream_exception = "stream.Exception";
00066 char const* const stream_dropped = "stream.Dropped";
00067 }
00068
00081 enum state
00082 {
00083 Unknown = -1,
00084 New = 1,
00085 Open = 2,
00086 Closed = 3,
00087 Dropped = 4,
00088 Error = 5
00089 };
00090
00099 enum activity
00100 {
00101 Read = 1,
00102 Write = 2,
00103 Exception = 4
00104 };
00105
00110 class SAGA_STREAM_PACKAGE_EXPORT stream
00111 : public saga::object,
00112 public saga::detail::attribute<stream>,
00113 public saga::detail::monitorable<stream>
00114 {
00116
00117 friend struct saga::detail::attribute<stream>;
00118 friend struct saga::detail::monitorable<stream>;
00119
00120 typedef saga::detail::attribute<stream> attribute_base_type;
00121 typedef saga::detail::monitorable<stream> monitorable_base_type;
00123
00124 public:
00125
00130
00131
00132 protected:
00134
00135 TR1::shared_ptr <saga::impl::stream> get_impl_sp(void) const;
00136 saga::impl::stream* get_impl (void) const;
00137 friend class saga::impl::stream;
00138 friend struct saga::detail::create_default<stream>;
00139
00140 explicit stream (saga::impl::stream *);
00141 explicit stream (int);
00143
00144 private:
00145
00146 SAGA_CALL_CREATE_PRIV_2(session const&, saga::url)
00147
00148
00149 SAGA_CALL_CONST_PRIV_0(get_url)
00150 SAGA_CALL_CONST_PRIV_0(get_context)
00151
00152
00153 SAGA_CALL_PRIV_0(connect)
00154 SAGA_CALL_PRIV_2(wait, saga::stream::activity, double)
00155 SAGA_CALL_PRIV_1(close, double)
00156 SAGA_CALL_PRIV_2(read, saga::mutable_buffer, saga::ssize_t)
00157 SAGA_CALL_PRIV_2(write, saga::const_buffer, saga::ssize_t)
00158
00159 void init_attributes();
00160 void init_metrics();
00161
00162 public:
00167 explicit stream (session const & s, saga::url url = saga::url());
00168
00173 explicit stream (saga::url url);
00174
00179 stream ();
00180
00185 explicit stream (saga::object const& o);
00186
00191 ~stream (void);
00192
00196 static stream create(session const& s, saga::url name = saga::url())
00197 {
00198 return stream(s, name);
00199 }
00201 SAGA_CALL_CREATE_2_DEF_1(session const&, saga::url, saga::url())
00203
00206 static stream create(saga::url name = saga::url())
00207 {
00208 return stream(name);
00209 }
00210 template <typename Tag>
00211 static saga::task create(saga::url name = saga::url())
00212 {
00213 return create<Tag>(detail::get_the_session(), name);
00214 }
00215
00220 stream &operator= (saga::object const& o);
00221
00228 saga::url get_url() const
00229 {
00230 saga::task t = get_urlpriv(saga::task_base::Sync());
00231 return t.get_result<saga::url>();
00232 }
00233 SAGA_CALL_CONST_PUB_0_DEF_0(get_url)
00234
00235
00241 saga::context get_context() const
00242 {
00243 saga::task t = get_contextpriv(saga::task_base::Sync());
00244 return t.get_result<saga::context>();
00245 }
00246 SAGA_CALL_CONST_PUB_0_DEF_0(get_context)
00247
00248
00254 saga::context connect()
00255 {
00256 saga::task t = connectpriv(saga::task_base::Sync());
00257 return t.get_result<saga::context>();
00258 }
00259 SAGA_CALL_PUB_0_DEF_0(connect)
00260
00261
00270 std::vector<saga::stream::activity>
00271 wait(saga::stream::activity what, double timeout = -1.0)
00272 {
00273 saga::task t = waitpriv(what, timeout, saga::task_base::Sync());
00274 return t.get_result<std::vector<saga::stream::activity> >();
00275 }
00276 SAGA_CALL_PUB_2_DEF_1(wait, saga::stream::activity, double, -1.0)
00277
00278
00283 void close(double timeout = 0.0)
00284 {
00285 saga::task t = closepriv(timeout, saga::task_base::Sync());
00286 t.get_result ();
00287 }
00288 SAGA_CALL_PUB_1_DEF_1(close, double, 0.0)
00289
00290
00298 saga::ssize_t read(saga::mutable_buffer buffer, saga::ssize_t length = 0)
00299 {
00300 saga::task t = readpriv(buffer, length, saga::task_base::Sync());
00301 return t.get_result<saga::ssize_t>();
00302 }
00303 SAGA_CALL_PUB_2_DEF_1(read, saga::mutable_buffer, saga::ssize_t, 0)
00304
00305
00313 saga::ssize_t write(saga::const_buffer buffer, saga::ssize_t length = 0)
00314 {
00315 saga::task t = writepriv(buffer, length, saga::task_base::Sync());
00316 return t.get_result<saga::ssize_t>();
00317 }
00318 SAGA_CALL_PUB_2_DEF_1(write, saga::const_buffer, saga::ssize_t, 0)
00319 };
00320
00321 }
00322
00323 namespace detail
00324 {
00325
00326
00327
00328 template<>
00329 struct create_default<saga::stream::stream>
00330 {
00331 static saga::stream::stream* call()
00332 {
00333 return new saga::stream::stream(1);
00334 }
00335 template <typename T_> static void call(T_* obj)
00336 {
00337 new (obj) saga::stream::stream(1);
00338 }
00339 };
00340 }
00341
00342 }
00343
00344
00345 #if defined(BOOST_MSVC)
00346 #pragma warning(pop)
00347 #endif
00348
00349 #endif // SAGA_COMM_STREAM_STREAM_HPP
00350