Package proton :: Module _message
[frames] | no frames]

Source Code for Module proton._message

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  from cproton import PN_STATUS_SETTLED, PN_DEFAULT_PRIORITY, PN_STATUS_MODIFIED, PN_STATUS_RELEASED, PN_STATUS_ABORTED, \ 
 23      PN_STATUS_REJECTED, PN_STATUS_PENDING, PN_STATUS_UNKNOWN, PN_STATUS_ACCEPTED, \ 
 24      PN_OVERFLOW, \ 
 25      pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \ 
 26      pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \ 
 27      pn_message_get_content_encoding, pn_message_body, \ 
 28      pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \ 
 29      pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \ 
 30      pn_message_is_first_acquirer, pn_message_set_priority, \ 
 31      pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \ 
 32      pn_message_set_user_id, pn_message_set_group_id, \ 
 33      pn_message_id, pn_message_clear, pn_message_set_durable, \ 
 34      pn_message_set_first_acquirer, pn_message_get_delivery_count, \ 
 35      pn_message_decode, pn_message_set_reply_to_group_id, \ 
 36      pn_message_get_group_sequence, pn_message_set_reply_to, \ 
 37      pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \ 
 38      pn_message_instructions, pn_message_get_content_type, \ 
 39      pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \ 
 40      pn_message_set_group_sequence, pn_message_set_inferred, \ 
 41      pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text 
 42   
 43  from . import _compat 
 44  from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode 
 45  from ._data import Data, ulong, symbol 
 46  from ._endpoints import Link 
 47  from ._exceptions import EXCEPTIONS, MessageException 
 48   
 49  # 
 50  # Hack to provide Python2 <---> Python3 compatibility 
 51  try: 
 52      unicode() 
 53  except NameError: 
 54      unicode = str 
 55   
 56   
 57  PENDING = Constant("PENDING") 
 58  ACCEPTED = Constant("ACCEPTED") 
 59  REJECTED = Constant("REJECTED") 
 60  RELEASED = Constant("RELEASED") 
 61  MODIFIED = Constant("MODIFIED") 
 62  ABORTED = Constant("ABORTED") 
 63  SETTLED = Constant("SETTLED") 
 64   
 65  STATUSES = { 
 66      PN_STATUS_ABORTED: ABORTED, 
 67      PN_STATUS_ACCEPTED: ACCEPTED, 
 68      PN_STATUS_REJECTED: REJECTED, 
 69      PN_STATUS_RELEASED: RELEASED, 
 70      PN_STATUS_MODIFIED: MODIFIED, 
 71      PN_STATUS_PENDING: PENDING, 
 72      PN_STATUS_SETTLED: SETTLED, 
 73      PN_STATUS_UNKNOWN: None 
 74  } 
 75   
 76   
77 -class Message(object):
78 """The L{Message} class is a mutable holder of message content. 79 80 @ivar instructions: delivery instructions for the message 81 @type instructions: dict 82 @ivar annotations: infrastructure defined message annotations 83 @type annotations: dict 84 @ivar properties: application defined message properties 85 @type properties: dict 86 @ivar body: message body 87 @type body: bytes | unicode | dict | list | int | long | float | UUID 88 """ 89 90 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 91
92 - def __init__(self, body=None, **kwargs):
93 """ 94 @param kwargs: Message property name/value pairs to initialise the Message 95 """ 96 self._msg = pn_message() 97 self._id = Data(pn_message_id(self._msg)) 98 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 99 self.instructions = None 100 self.annotations = None 101 self.properties = None 102 self.body = body 103 for k, v in _compat.iteritems(kwargs): 104 getattr(self, k) # Raise exception if it's not a valid attribute. 105 setattr(self, k, v)
106
107 - def __del__(self):
108 if hasattr(self, "_msg"): 109 pn_message_free(self._msg) 110 del self._msg
111
112 - def _check(self, err):
113 if err < 0: 114 exc = EXCEPTIONS.get(err, MessageException) 115 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 116 else: 117 return err
118
119 - def _check_property_keys(self):
120 for k in self.properties.keys(): 121 if isinstance(k, unicode): 122 # py2 unicode, py3 str (via hack definition) 123 continue 124 # If key is binary then change to string 125 elif isinstance(k, str): 126 # py2 str 127 self.properties[k.encode('utf-8')] = self.properties.pop(k) 128 else: 129 raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k)))
130
131 - def _pre_encode(self):
132 inst = Data(pn_message_instructions(self._msg)) 133 ann = Data(pn_message_annotations(self._msg)) 134 props = Data(pn_message_properties(self._msg)) 135 body = Data(pn_message_body(self._msg)) 136 137 inst.clear() 138 if self.instructions is not None: 139 inst.put_object(self.instructions) 140 ann.clear() 141 if self.annotations is not None: 142 ann.put_object(self.annotations) 143 props.clear() 144 if self.properties is not None: 145 self._check_property_keys() 146 props.put_object(self.properties) 147 body.clear() 148 if self.body is not None: 149 body.put_object(self.body)
150
151 - def _post_decode(self):
152 inst = Data(pn_message_instructions(self._msg)) 153 ann = Data(pn_message_annotations(self._msg)) 154 props = Data(pn_message_properties(self._msg)) 155 body = Data(pn_message_body(self._msg)) 156 157 if inst.next(): 158 self.instructions = inst.get_object() 159 else: 160 self.instructions = None 161 if ann.next(): 162 self.annotations = ann.get_object() 163 else: 164 self.annotations = None 165 if props.next(): 166 self.properties = props.get_object() 167 else: 168 self.properties = None 169 if body.next(): 170 self.body = body.get_object() 171 else: 172 self.body = None
173
174 - def clear(self):
175 """ 176 Clears the contents of the L{Message}. All fields will be reset to 177 their default values. 178 """ 179 pn_message_clear(self._msg) 180 self.instructions = None 181 self.annotations = None 182 self.properties = None 183 self.body = None
184
185 - def _is_inferred(self):
186 return pn_message_is_inferred(self._msg)
187
188 - def _set_inferred(self, value):
189 self._check(pn_message_set_inferred(self._msg, bool(value)))
190 191 inferred = property(_is_inferred, _set_inferred, doc=""" 192 The inferred flag for a message indicates how the message content 193 is encoded into AMQP sections. If inferred is true then binary and 194 list values in the body of the message will be encoded as AMQP DATA 195 and AMQP SEQUENCE sections, respectively. If inferred is false, 196 then all values in the body of the message will be encoded as AMQP 197 VALUE sections regardless of their type. 198 """) 199
200 - def _is_durable(self):
201 return pn_message_is_durable(self._msg)
202
203 - def _set_durable(self, value):
204 self._check(pn_message_set_durable(self._msg, bool(value)))
205 206 durable = property(_is_durable, _set_durable, 207 doc=""" 208 The durable property indicates that the message should be held durably 209 by any intermediaries taking responsibility for the message. 210 """) 211
212 - def _get_priority(self):
213 return pn_message_get_priority(self._msg)
214
215 - def _set_priority(self, value):
216 self._check(pn_message_set_priority(self._msg, value))
217 218 priority = property(_get_priority, _set_priority, 219 doc=""" 220 The priority of the message. 221 """) 222
223 - def _get_ttl(self):
224 return millis2secs(pn_message_get_ttl(self._msg))
225
226 - def _set_ttl(self, value):
227 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
228 229 ttl = property(_get_ttl, _set_ttl, 230 doc=""" 231 The time to live of the message measured in seconds. Expired messages 232 may be dropped. 233 """) 234
235 - def _is_first_acquirer(self):
236 return pn_message_is_first_acquirer(self._msg)
237
238 - def _set_first_acquirer(self, value):
239 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
240 241 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 242 doc=""" 243 True iff the recipient is the first to acquire the message. 244 """) 245
246 - def _get_delivery_count(self):
247 return pn_message_get_delivery_count(self._msg)
248
249 - def _set_delivery_count(self, value):
250 self._check(pn_message_set_delivery_count(self._msg, value))
251 252 delivery_count = property(_get_delivery_count, _set_delivery_count, 253 doc=""" 254 The number of delivery attempts made for this message. 255 """) 256
257 - def _get_id(self):
258 return self._id.get_object()
259
260 - def _set_id(self, value):
261 if isinteger(value): 262 value = ulong(value) 263 self._id.rewind() 264 self._id.put_object(value)
265 266 id = property(_get_id, _set_id, 267 doc=""" 268 The id of the message. 269 """) 270
271 - def _get_user_id(self):
272 return pn_message_get_user_id(self._msg)
273
274 - def _set_user_id(self, value):
275 self._check(pn_message_set_user_id(self._msg, value))
276 277 user_id = property(_get_user_id, _set_user_id, 278 doc=""" 279 The user id of the message creator. 280 """) 281
282 - def _get_address(self):
283 return utf82unicode(pn_message_get_address(self._msg))
284
285 - def _set_address(self, value):
286 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
287 288 address = property(_get_address, _set_address, 289 doc=""" 290 The address of the message. 291 """) 292
293 - def _get_subject(self):
294 return utf82unicode(pn_message_get_subject(self._msg))
295
296 - def _set_subject(self, value):
297 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
298 299 subject = property(_get_subject, _set_subject, 300 doc=""" 301 The subject of the message. 302 """) 303
304 - def _get_reply_to(self):
305 return utf82unicode(pn_message_get_reply_to(self._msg))
306
307 - def _set_reply_to(self, value):
308 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
309 310 reply_to = property(_get_reply_to, _set_reply_to, 311 doc=""" 312 The reply-to address for the message. 313 """) 314
315 - def _get_correlation_id(self):
316 return self._correlation_id.get_object()
317
318 - def _set_correlation_id(self, value):
319 if isinteger(value): 320 value = ulong(value) 321 self._correlation_id.rewind() 322 self._correlation_id.put_object(value)
323 324 correlation_id = property(_get_correlation_id, _set_correlation_id, 325 doc=""" 326 The correlation-id for the message. 327 """) 328
329 - def _get_content_type(self):
330 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
331
332 - def _set_content_type(self, value):
333 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
334 335 content_type = property(_get_content_type, _set_content_type, 336 doc=""" 337 The content-type of the message. 338 """) 339
340 - def _get_content_encoding(self):
341 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
342
343 - def _set_content_encoding(self, value):
344 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
345 346 content_encoding = property(_get_content_encoding, _set_content_encoding, 347 doc=""" 348 The content-encoding of the message. 349 """) 350
351 - def _get_expiry_time(self):
352 return millis2secs(pn_message_get_expiry_time(self._msg))
353
354 - def _set_expiry_time(self, value):
355 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
356 357 expiry_time = property(_get_expiry_time, _set_expiry_time, 358 doc=""" 359 The expiry time of the message. 360 """) 361
362 - def _get_creation_time(self):
363 return millis2secs(pn_message_get_creation_time(self._msg))
364
365 - def _set_creation_time(self, value):
366 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
367 368 creation_time = property(_get_creation_time, _set_creation_time, 369 doc=""" 370 The creation time of the message. 371 """) 372
373 - def _get_group_id(self):
374 return utf82unicode(pn_message_get_group_id(self._msg))
375
376 - def _set_group_id(self, value):
377 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
378 379 group_id = property(_get_group_id, _set_group_id, 380 doc=""" 381 The group id of the message. 382 """) 383
384 - def _get_group_sequence(self):
385 return pn_message_get_group_sequence(self._msg)
386
387 - def _set_group_sequence(self, value):
388 self._check(pn_message_set_group_sequence(self._msg, value))
389 390 group_sequence = property(_get_group_sequence, _set_group_sequence, 391 doc=""" 392 The sequence of the message within its group. 393 """) 394
395 - def _get_reply_to_group_id(self):
396 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
397
398 - def _set_reply_to_group_id(self, value):
399 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
400 401 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 402 doc=""" 403 The group-id for any replies. 404 """) 405
406 - def encode(self):
407 self._pre_encode() 408 sz = 16 409 while True: 410 err, data = pn_message_encode(self._msg, sz) 411 if err == PN_OVERFLOW: 412 sz *= 2 413 continue 414 else: 415 self._check(err) 416 return data
417
418 - def decode(self, data):
419 self._check(pn_message_decode(self._msg, data)) 420 self._post_decode()
421
422 - def send(self, sender, tag=None):
423 dlv = sender.delivery(tag or sender.delivery_tag()) 424 encoded = self.encode() 425 sender.stream(encoded) 426 sender.advance() 427 if sender.snd_settle_mode == Link.SND_SETTLED: 428 dlv.settle() 429 return dlv
430
431 - def recv(self, link):
432 """ 433 Receives and decodes the message content for the current delivery 434 from the link. Upon success it will return the current delivery 435 for the link. If there is no current delivery, or if the current 436 delivery is incomplete, or if the link is not a receiver, it will 437 return None. 438 439 @type link: Link 440 @param link: the link to receive a message from 441 @return the delivery associated with the decoded message (or None) 442 443 """ 444 if link.is_sender: return None 445 dlv = link.current 446 if not dlv or dlv.partial: return None 447 dlv.encoded = link.recv(dlv.pending) 448 link.advance() 449 # the sender has already forgotten about the delivery, so we might 450 # as well too 451 if link.remote_snd_settle_mode == Link.SND_SETTLED: 452 dlv.settle() 453 self.decode(dlv.encoded) 454 return dlv
455
456 - def __repr2__(self):
457 props = [] 458 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 459 "priority", "first_acquirer", "delivery_count", "id", 460 "correlation_id", "user_id", "group_id", "group_sequence", 461 "reply_to_group_id", "instructions", "annotations", 462 "properties", "body"): 463 value = getattr(self, attr) 464 if value: props.append("%s=%r" % (attr, value)) 465 return "Message(%s)" % ", ".join(props)
466
467 - def __repr__(self):
468 tmp = pn_string(None) 469 err = pn_inspect(self._msg, tmp) 470 result = pn_string_get(tmp) 471 pn_free(tmp) 472 self._check(err) 473 return result
474