49 #include "XDRStreamMarshaller.h"
50 #ifdef USE_POSIX_THREADS
51 #include "MarshallerThread.h"
67 char *XDRStreamMarshaller::d_buf = 0;
68 static const int XDR_DAP_BUFF_SIZE=256;
79 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
80 d_out(out), d_partial_put_byte_count(0), tm(0)
82 if (!d_buf) d_buf = (
char *) malloc(XDR_DAP_BUFF_SIZE);
83 if (!d_buf)
throw Error(internal_error,
"Failed to allocate memory for data serialization.");
85 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
87 #ifdef USE_POSIX_THREADS
92 XDRStreamMarshaller::~XDRStreamMarshaller()
96 #ifdef USE_POSIX_THREADS
102 void XDRStreamMarshaller::put_byte(dods_byte val)
104 if (!xdr_setpos(&d_sink, 0))
105 throw Error(
"Network I/O Error. Could not send byte data - unable to set stream position.");
107 if (!xdr_char(&d_sink, (
char *) &val))
109 "Network I/O Error. Could not send byte data.");
111 unsigned int bytes_written = xdr_getpos(&d_sink);
114 "Network I/O Error. Could not send byte data - unable to get stream position.");
116 #ifdef USE_POSIX_THREADS
117 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
120 d_out.write(d_buf, bytes_written);
123 void XDRStreamMarshaller::put_int16(dods_int16 val)
125 if (!xdr_setpos(&d_sink, 0))
127 "Network I/O Error. Could not send int 16 data - unable to set stream position.");
129 if (!XDR_INT16(&d_sink, &val))
131 "Network I/O Error. Could not send int 16 data.");
133 unsigned int bytes_written = xdr_getpos(&d_sink);
136 "Network I/O Error. Could not send int 16 data - unable to get stream position.");
138 #ifdef USE_POSIX_THREADS
139 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
142 d_out.write(d_buf, bytes_written);
145 void XDRStreamMarshaller::put_int32(dods_int32 val)
147 if (!xdr_setpos(&d_sink, 0))
149 "Network I/O Error. Could not send int 32 data - unable to set stream position.");
151 if (!XDR_INT32(&d_sink, &val))
153 "Network I/O Error. Culd not read int 32 data.");
155 unsigned int bytes_written = xdr_getpos(&d_sink);
158 "Network I/O Error. Could not send int 32 data - unable to get stream position.");
160 #ifdef USE_POSIX_THREADS
161 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
164 d_out.write(d_buf, bytes_written);
167 void XDRStreamMarshaller::put_float32(dods_float32 val)
169 if (!xdr_setpos(&d_sink, 0))
171 "Network I/O Error. Could not send float 32 data - unable to set stream position.");
173 if (!xdr_float(&d_sink, &val))
175 "Network I/O Error. Could not send float 32 data.");
177 unsigned int bytes_written = xdr_getpos(&d_sink);
180 "Network I/O Error. Could not send float 32 data - unable to get stream position.");
182 #ifdef USE_POSIX_THREADS
183 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
186 d_out.write(d_buf, bytes_written);
189 void XDRStreamMarshaller::put_float64(dods_float64 val)
191 if (!xdr_setpos(&d_sink, 0))
193 "Network I/O Error. Could not send float 64 data - unable to set stream position.");
195 if (!xdr_double(&d_sink, &val))
197 "Network I/O Error. Could not send float 64 data.");
199 unsigned int bytes_written = xdr_getpos(&d_sink);
202 "Network I/O Error. Could not send float 64 data - unable to get stream position.");
204 #ifdef USE_POSIX_THREADS
205 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
208 d_out.write(d_buf, bytes_written);
211 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
213 if (!xdr_setpos(&d_sink, 0))
215 "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
217 if (!XDR_UINT16(&d_sink, &val))
219 "Network I/O Error. Could not send uint 16 data.");
221 unsigned int bytes_written = xdr_getpos(&d_sink);
224 "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
226 #ifdef USE_POSIX_THREADS
227 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
230 d_out.write(d_buf, bytes_written);
233 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
235 if (!xdr_setpos(&d_sink, 0))
237 "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
239 if (!XDR_UINT32(&d_sink, &val))
241 "Network I/O Error. Could not send uint 32 data.");
243 unsigned int bytes_written = xdr_getpos(&d_sink);
246 "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
248 #ifdef USE_POSIX_THREADS
249 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
252 d_out.write(d_buf, bytes_written);
255 void XDRStreamMarshaller::put_str(
const string &val)
257 int size = val.length() + 8;
260 vector<char> str_buf(size);
263 xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
265 if (!xdr_setpos(&str_sink, 0))
267 "Network I/O Error. Could not send string data - unable to set stream position.");
269 const char *out_tmp = val.c_str();
270 if (!xdr_string(&str_sink, (
char **) &out_tmp, size))
272 "Network I/O Error. Could not send string data.");
274 unsigned int bytes_written = xdr_getpos(&str_sink);
277 "Network I/O Error. Could not send string data - unable to get stream position.");
279 #ifdef USE_POSIX_THREADS
280 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
283 d_out.write(&str_buf[0], bytes_written);
285 xdr_destroy(&str_sink);
288 xdr_destroy(&str_sink);
293 void XDRStreamMarshaller::put_url(
const string &val)
298 void XDRStreamMarshaller::put_opaque(
char *val,
unsigned int len)
300 if (len > XDR_DAP_BUFF_SIZE)
301 throw Error(
"Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
303 if (!xdr_setpos(&d_sink, 0))
305 "Network I/O Error. Could not send opaque data - unable to set stream position.");
307 if (!xdr_opaque(&d_sink, val, len))
309 "Network I/O Error. Could not send opaque data.");
311 unsigned int bytes_written = xdr_getpos(&d_sink);
314 "Network I/O Error. Could not send opaque data - unable to get stream position.");
316 #ifdef USE_POSIX_THREADS
317 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
320 d_out.write(d_buf, bytes_written);
323 void XDRStreamMarshaller::put_int(
int val)
325 if (!xdr_setpos(&d_sink, 0))
327 "Network I/O Error. Could not send int data - unable to set stream position.");
329 if (!xdr_int(&d_sink, &val))
331 "Network I/O Error(1). Could not send int data.");
333 unsigned int bytes_written = xdr_getpos(&d_sink);
336 "Network I/O Error. Could not send int data - unable to get stream position.");
338 #ifdef USE_POSIX_THREADS
339 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
342 d_out.write(d_buf, bytes_written);
345 void XDRStreamMarshaller::put_vector(
char *val,
int num,
int width, Vector &vec)
347 put_vector(val, num, width, vec.var()->type());
363 d_partial_put_byte_count = 0;
374 #ifdef USE_POSIX_THREADS
375 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
382 unsigned int mod_4 = d_partial_put_byte_count & 0x03;
383 unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
386 vector<char> padding(4, 0);
388 d_out.write(&padding[0], pad);
389 if (d_out.fail())
throw Error(
"Network I/O Error. Could not send vector data padding");
394 void XDRStreamMarshaller::put_vector(
char *val,
int num,
Vector &)
396 if (!val)
throw InternalErr(__FILE__, __LINE__,
"Could not send byte vector data. Buffer pointer is not set.");
402 const unsigned int add_to = 8;
405 char *byte_buf =
new char[num + add_to];
408 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
409 if (!xdr_setpos(&byte_sink, 0))
410 throw Error(
"Network I/O Error. Could not send byte vector data - unable to set stream position.");
412 if (!xdr_bytes(&byte_sink, (
char **) &val, (
unsigned int *) &num, num + add_to))
413 throw Error(
"Network I/O Error(2). Could not send byte vector data - unable to encode data.");
415 unsigned int bytes_written = xdr_getpos(&byte_sink);
417 throw Error(
"Network I/O Error. Could not send byte vector data - unable to get stream position.");
419 #ifdef USE_POSIX_THREADS
420 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
421 tm->increment_child_thread_count();
423 xdr_destroy(&byte_sink);
425 d_out.write(byte_buf, bytes_written);
426 xdr_destroy(&byte_sink);
432 DBG(cerr <<
"Caught an exception in put_vector_thread" << endl);
433 xdr_destroy(&byte_sink);
450 void XDRStreamMarshaller::put_vector(
char *val,
unsigned int num,
int width,
Type type)
452 assert(val || num == 0);
460 int use_width = width;
461 if (use_width < 4) use_width = 4;
465 int size = (num * use_width) + 4;
469 char *vec_buf =
new char[size];
472 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
475 if (!xdr_setpos(&vec_sink, 0))
476 throw Error(
"Network I/O Error. Could not send vector data - unable to set stream position.");
479 if (!xdr_array(&vec_sink, (
char **) &val, (
unsigned int *) &num, size, width,
XDRUtils::xdr_coder(type)))
480 throw Error(
"Network I/O Error(2). Could not send vector data - unable to encode.");
483 unsigned int bytes_written = xdr_getpos(&vec_sink);
485 throw Error(
"Network I/O Error. Could not send vector data - unable to get stream position.");
487 #ifdef USE_POSIX_THREADS
488 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
489 tm->increment_child_thread_count();
491 xdr_destroy(&vec_sink);
493 d_out.write(vec_buf, bytes_written);
494 xdr_destroy(&vec_sink);
499 xdr_destroy(&vec_sink);
521 const unsigned int add_to = 8;
522 unsigned int bufsiz = num + add_to;
524 char *byte_buf =
new char[bufsiz];
527 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
528 if (!xdr_setpos(&byte_sink, 0))
529 throw Error(
"Network I/O Error. Could not send byte vector data - unable to set stream position.");
531 if (!xdr_bytes(&byte_sink, (
char **) &val, (
unsigned int *) &num, bufsiz))
532 throw Error(
"Network I/O Error(2). Could not send byte vector data - unable to encode data.");
534 #ifdef USE_POSIX_THREADS
535 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
536 tm->increment_child_thread_count();
539 d_partial_put_byte_count += num;
542 xdr_destroy(&byte_sink);
547 d_out.write(byte_buf + 4, num);
550 throw Error (
"Network I/O Error. Could not send initial part of byte vector data");
553 d_partial_put_byte_count += num;
555 xdr_destroy(&byte_sink);
560 xdr_destroy(&byte_sink);
566 int use_width = (width < 4) ? 4 : width;
570 int size = (num * use_width) + 4;
574 char *vec_buf =
new char[size];
577 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
580 if (!xdr_setpos(&vec_sink, 0))
581 throw Error(
"Network I/O Error. Could not send vector data - unable to set stream position.");
584 if (!xdr_array(&vec_sink, (
char **) &val, (
unsigned int *) &num, size, width,
XDRUtils::xdr_coder(type)))
585 throw Error(
"Network I/O Error(2). Could not send vector data -unable to encode data.");
587 #ifdef USE_POSIX_THREADS
588 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
589 tm->increment_child_thread_count();
592 d_partial_put_byte_count += (size - 4);
594 xdr_destroy(&vec_sink);
598 d_out.write(vec_buf + 4, size - 4);
601 throw Error (
"Network I/O Error. Could not send part of vector data");
604 d_partial_put_byte_count += (size - 4);
606 xdr_destroy(&vec_sink);
611 xdr_destroy(&vec_sink);
620 strm << DapIndent::LMarg <<
"XDRStreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;