1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 from cproton import PN_STATUS_SETTLED, PN_DEFAULT_PRIORITY, PN_STATUS_MODIFIED, PN_STATUS_RELEASED, PN_STATUS_ABORTED, \
23 PN_STATUS_REJECTED, PN_STATUS_PENDING, PN_STATUS_UNKNOWN, PN_STATUS_ACCEPTED, \
24 PN_OVERFLOW, \
25 pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \
26 pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \
27 pn_message_get_content_encoding, pn_message_body, \
28 pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \
29 pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \
30 pn_message_is_first_acquirer, pn_message_set_priority, \
31 pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \
32 pn_message_set_user_id, pn_message_set_group_id, \
33 pn_message_id, pn_message_clear, pn_message_set_durable, \
34 pn_message_set_first_acquirer, pn_message_get_delivery_count, \
35 pn_message_decode, pn_message_set_reply_to_group_id, \
36 pn_message_get_group_sequence, pn_message_set_reply_to, \
37 pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \
38 pn_message_instructions, pn_message_get_content_type, \
39 pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \
40 pn_message_set_group_sequence, pn_message_set_inferred, \
41 pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
42
43 from . import _compat
44 from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
45 from ._data import Data, ulong, symbol
46 from ._endpoints import Link
47 from ._exceptions import EXCEPTIONS, MessageException
48
49
50
51 try:
52 unicode()
53 except NameError:
54 unicode = str
55
56
57 PENDING = Constant("PENDING")
58 ACCEPTED = Constant("ACCEPTED")
59 REJECTED = Constant("REJECTED")
60 RELEASED = Constant("RELEASED")
61 MODIFIED = Constant("MODIFIED")
62 ABORTED = Constant("ABORTED")
63 SETTLED = Constant("SETTLED")
64
65 STATUSES = {
66 PN_STATUS_ABORTED: ABORTED,
67 PN_STATUS_ACCEPTED: ACCEPTED,
68 PN_STATUS_REJECTED: REJECTED,
69 PN_STATUS_RELEASED: RELEASED,
70 PN_STATUS_MODIFIED: MODIFIED,
71 PN_STATUS_PENDING: PENDING,
72 PN_STATUS_SETTLED: SETTLED,
73 PN_STATUS_UNKNOWN: None
74 }
75
76
78 """The L{Message} class is a mutable holder of message content.
79
80 @ivar instructions: delivery instructions for the message
81 @type instructions: dict
82 @ivar annotations: infrastructure defined message annotations
83 @type annotations: dict
84 @ivar properties: application defined message properties
85 @type properties: dict
86 @ivar body: message body
87 @type body: bytes | unicode | dict | list | int | long | float | UUID
88 """
89
90 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
91
92 - def __init__(self, body=None, **kwargs):
93 """
94 @param kwargs: Message property name/value pairs to initialise the Message
95 """
96 self._msg = pn_message()
97 self._id = Data(pn_message_id(self._msg))
98 self._correlation_id = Data(pn_message_correlation_id(self._msg))
99 self.instructions = None
100 self.annotations = None
101 self.properties = None
102 self.body = body
103 for k, v in _compat.iteritems(kwargs):
104 getattr(self, k)
105 setattr(self, k, v)
106
108 if hasattr(self, "_msg"):
109 pn_message_free(self._msg)
110 del self._msg
111
113 if err < 0:
114 exc = EXCEPTIONS.get(err, MessageException)
115 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
116 else:
117 return err
118
130
150
151 - def _post_decode(self):
152 inst = Data(pn_message_instructions(self._msg))
153 ann = Data(pn_message_annotations(self._msg))
154 props = Data(pn_message_properties(self._msg))
155 body = Data(pn_message_body(self._msg))
156
157 if inst.next():
158 self.instructions = inst.get_object()
159 else:
160 self.instructions = None
161 if ann.next():
162 self.annotations = ann.get_object()
163 else:
164 self.annotations = None
165 if props.next():
166 self.properties = props.get_object()
167 else:
168 self.properties = None
169 if body.next():
170 self.body = body.get_object()
171 else:
172 self.body = None
173
175 """
176 Clears the contents of the L{Message}. All fields will be reset to
177 their default values.
178 """
179 pn_message_clear(self._msg)
180 self.instructions = None
181 self.annotations = None
182 self.properties = None
183 self.body = None
184
186 return pn_message_is_inferred(self._msg)
187
189 self._check(pn_message_set_inferred(self._msg, bool(value)))
190
191 inferred = property(_is_inferred, _set_inferred, doc="""
192 The inferred flag for a message indicates how the message content
193 is encoded into AMQP sections. If inferred is true then binary and
194 list values in the body of the message will be encoded as AMQP DATA
195 and AMQP SEQUENCE sections, respectively. If inferred is false,
196 then all values in the body of the message will be encoded as AMQP
197 VALUE sections regardless of their type.
198 """)
199
201 return pn_message_is_durable(self._msg)
202
204 self._check(pn_message_set_durable(self._msg, bool(value)))
205
206 durable = property(_is_durable, _set_durable,
207 doc="""
208 The durable property indicates that the message should be held durably
209 by any intermediaries taking responsibility for the message.
210 """)
211
213 return pn_message_get_priority(self._msg)
214
216 self._check(pn_message_set_priority(self._msg, value))
217
218 priority = property(_get_priority, _set_priority,
219 doc="""
220 The priority of the message.
221 """)
222
225
227 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
228
229 ttl = property(_get_ttl, _set_ttl,
230 doc="""
231 The time to live of the message measured in seconds. Expired messages
232 may be dropped.
233 """)
234
236 return pn_message_is_first_acquirer(self._msg)
237
239 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
240
241 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
242 doc="""
243 True iff the recipient is the first to acquire the message.
244 """)
245
247 return pn_message_get_delivery_count(self._msg)
248
250 self._check(pn_message_set_delivery_count(self._msg, value))
251
252 delivery_count = property(_get_delivery_count, _set_delivery_count,
253 doc="""
254 The number of delivery attempts made for this message.
255 """)
256
259
265
266 id = property(_get_id, _set_id,
267 doc="""
268 The id of the message.
269 """)
270
272 return pn_message_get_user_id(self._msg)
273
275 self._check(pn_message_set_user_id(self._msg, value))
276
277 user_id = property(_get_user_id, _set_user_id,
278 doc="""
279 The user id of the message creator.
280 """)
281
284
286 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
287
288 address = property(_get_address, _set_address,
289 doc="""
290 The address of the message.
291 """)
292
295
297 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
298
299 subject = property(_get_subject, _set_subject,
300 doc="""
301 The subject of the message.
302 """)
303
306
308 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
309
310 reply_to = property(_get_reply_to, _set_reply_to,
311 doc="""
312 The reply-to address for the message.
313 """)
314
317
323
324 correlation_id = property(_get_correlation_id, _set_correlation_id,
325 doc="""
326 The correlation-id for the message.
327 """)
328
330 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
331
332 - def _set_content_type(self, value):
333 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
334
335 content_type = property(_get_content_type, _set_content_type,
336 doc="""
337 The content-type of the message.
338 """)
339
341 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
342
343 - def _set_content_encoding(self, value):
344 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
345
346 content_encoding = property(_get_content_encoding, _set_content_encoding,
347 doc="""
348 The content-encoding of the message.
349 """)
350
352 return millis2secs(pn_message_get_expiry_time(self._msg))
353
355 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
356
357 expiry_time = property(_get_expiry_time, _set_expiry_time,
358 doc="""
359 The expiry time of the message.
360 """)
361
363 return millis2secs(pn_message_get_creation_time(self._msg))
364
366 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
367
368 creation_time = property(_get_creation_time, _set_creation_time,
369 doc="""
370 The creation time of the message.
371 """)
372
375
377 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
378
379 group_id = property(_get_group_id, _set_group_id,
380 doc="""
381 The group id of the message.
382 """)
383
385 return pn_message_get_group_sequence(self._msg)
386
388 self._check(pn_message_set_group_sequence(self._msg, value))
389
390 group_sequence = property(_get_group_sequence, _set_group_sequence,
391 doc="""
392 The sequence of the message within its group.
393 """)
394
396 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
397
399 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
400
401 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
402 doc="""
403 The group-id for any replies.
404 """)
405
407 self._pre_encode()
408 sz = 16
409 while True:
410 err, data = pn_message_encode(self._msg, sz)
411 if err == PN_OVERFLOW:
412 sz *= 2
413 continue
414 else:
415 self._check(err)
416 return data
417
419 self._check(pn_message_decode(self._msg, data))
420 self._post_decode()
421
422 - def send(self, sender, tag=None):
430
431 - def recv(self, link):
432 """
433 Receives and decodes the message content for the current delivery
434 from the link. Upon success it will return the current delivery
435 for the link. If there is no current delivery, or if the current
436 delivery is incomplete, or if the link is not a receiver, it will
437 return None.
438
439 @type link: Link
440 @param link: the link to receive a message from
441 @return the delivery associated with the decoded message (or None)
442
443 """
444 if link.is_sender: return None
445 dlv = link.current
446 if not dlv or dlv.partial: return None
447 dlv.encoded = link.recv(dlv.pending)
448 link.advance()
449
450
451 if link.remote_snd_settle_mode == Link.SND_SETTLED:
452 dlv.settle()
453 self.decode(dlv.encoded)
454 return dlv
455
457 props = []
458 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
459 "priority", "first_acquirer", "delivery_count", "id",
460 "correlation_id", "user_id", "group_id", "group_sequence",
461 "reply_to_group_id", "instructions", "annotations",
462 "properties", "body"):
463 value = getattr(self, attr)
464 if value: props.append("%s=%r" % (attr, value))
465 return "Message(%s)" % ", ".join(props)
466
468 tmp = pn_string(None)
469 err = pn_inspect(self._msg, tmp)
470 result = pn_string_get(tmp)
471 pn_free(tmp)
472 self._check(err)
473 return result
474