1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Message} -- A class for creating and/or accessing AMQP message content.
27 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
28 data.
29
30 """
31 from __future__ import absolute_import
32
33 from cproton import *
34 from .wrapper import Wrapper
35 from proton import _compat
36
37 import logging, weakref, socket, sys, threading
38
39 try:
40 handler = logging.NullHandler()
41 except AttributeError:
45
46 - def emit(self, record):
48
51
52 handler = NullHandler()
53
54 log = logging.getLogger("proton")
55 log.addHandler(handler)
56
57 try:
58 import uuid
62
63 except ImportError:
64 """
65 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
66 """
67 import struct
70 - def __init__(self, hex=None, bytes=None):
71 if [hex, bytes].count(None) != 1:
72 raise TypeError("need one of hex or bytes")
73 if bytes is not None:
74 self.bytes = bytes
75 elif hex is not None:
76 fields=hex.split("-")
77 fields[4:5] = [fields[4][:4], fields[4][4:]]
78 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
79
81 if isinstance(other, uuid.UUID):
82 return cmp(self.bytes, other.bytes)
83 else:
84 return -1
85
87 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
88
90 return "UUID(%r)" % str(self)
91
94
95 import os, random, time
96 rand = random.Random()
97 rand.seed((os.getpid(), time.time(), socket.gethostname()))
99 data = [rand.randint(0, 255) for i in xrange(16)]
100
101
102 data[6] &= 0x0F
103 data[6] |= 0x40
104
105
106 data[8] &= 0x3F
107 data[8] |= 0x80
108 return "".join(map(chr, data))
109
111 return uuid.UUID(bytes=random_uuid())
112
115
116
117
118
119 try:
120 bytes()
121 except NameError:
122 bytes = str
123 try:
124 long()
125 except NameError:
126 long = int
127 try:
128 unicode()
129 except NameError:
130 unicode = str
131
132
133 VERSION_MAJOR = PN_VERSION_MAJOR
134 VERSION_MINOR = PN_VERSION_MINOR
135 VERSION_POINT = PN_VERSION_POINT
136 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
137 API_LANGUAGE = "C"
138 IMPLEMENTATION_LANGUAGE = "C"
147
149 """
150 The root of the proton exception hierarchy. All proton exception
151 classes derive from this exception.
152 """
153 pass
154
156 """
157 A timeout exception indicates that a blocking operation has timed
158 out.
159 """
160 pass
161
163 """
164 An interrupt exception indicates that a blocking operation was interrupted.
165 """
166 pass
167
169 """
170 The MessageException class is the root of the message exception
171 hierarchy. All exceptions generated by the Message class derive from
172 this exception.
173 """
174 pass
175
176 EXCEPTIONS = {
177 PN_TIMEOUT: Timeout,
178 PN_INTR: Interrupt
179 }
180
181 PENDING = Constant("PENDING")
182 ACCEPTED = Constant("ACCEPTED")
183 REJECTED = Constant("REJECTED")
184 RELEASED = Constant("RELEASED")
185 MODIFIED = Constant("MODIFIED")
186 ABORTED = Constant("ABORTED")
187 SETTLED = Constant("SETTLED")
188
189 STATUSES = {
190 PN_STATUS_ABORTED: ABORTED,
191 PN_STATUS_ACCEPTED: ACCEPTED,
192 PN_STATUS_REJECTED: REJECTED,
193 PN_STATUS_RELEASED: RELEASED,
194 PN_STATUS_MODIFIED: MODIFIED,
195 PN_STATUS_PENDING: PENDING,
196 PN_STATUS_SETTLED: SETTLED,
197 PN_STATUS_UNKNOWN: None
198 }
201 """The L{Message} class is a mutable holder of message content.
202
203 @ivar instructions: delivery instructions for the message
204 @type instructions: dict
205 @ivar annotations: infrastructure defined message annotations
206 @type annotations: dict
207 @ivar properties: application defined message properties
208 @type properties: dict
209 @ivar body: message body
210 @type body: bytes | unicode | dict | list | int | long | float | UUID
211 """
212
213 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
214
215 - def __init__(self, body=None, **kwargs):
216 """
217 @param kwargs: Message property name/value pairs to initialise the Message
218 """
219 self._msg = pn_message()
220 self._id = Data(pn_message_id(self._msg))
221 self._correlation_id = Data(pn_message_correlation_id(self._msg))
222 self.instructions = None
223 self.annotations = None
224 self.properties = None
225 self.body = body
226 for k,v in _compat.iteritems(kwargs):
227 getattr(self, k)
228 setattr(self, k, v)
229
231 if hasattr(self, "_msg"):
232 pn_message_free(self._msg)
233 del self._msg
234
236 if err < 0:
237 exc = EXCEPTIONS.get(err, MessageException)
238 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
239 else:
240 return err
241
248
268
269 - def _post_decode(self):
270 inst = Data(pn_message_instructions(self._msg))
271 ann = Data(pn_message_annotations(self._msg))
272 props = Data(pn_message_properties(self._msg))
273 body = Data(pn_message_body(self._msg))
274
275 if inst.next():
276 self.instructions = inst.get_object()
277 else:
278 self.instructions = None
279 if ann.next():
280 self.annotations = ann.get_object()
281 else:
282 self.annotations = None
283 if props.next():
284 self.properties = props.get_object()
285 else:
286 self.properties = None
287 if body.next():
288 self.body = body.get_object()
289 else:
290 self.body = None
291
293 """
294 Clears the contents of the L{Message}. All fields will be reset to
295 their default values.
296 """
297 pn_message_clear(self._msg)
298 self.instructions = None
299 self.annotations = None
300 self.properties = None
301 self.body = None
302
304 return pn_message_is_inferred(self._msg)
305
307 self._check(pn_message_set_inferred(self._msg, bool(value)))
308
309 inferred = property(_is_inferred, _set_inferred, doc="""
310 The inferred flag for a message indicates how the message content
311 is encoded into AMQP sections. If inferred is true then binary and
312 list values in the body of the message will be encoded as AMQP DATA
313 and AMQP SEQUENCE sections, respectively. If inferred is false,
314 then all values in the body of the message will be encoded as AMQP
315 VALUE sections regardless of their type.
316 """)
317
319 return pn_message_is_durable(self._msg)
320
322 self._check(pn_message_set_durable(self._msg, bool(value)))
323
324 durable = property(_is_durable, _set_durable,
325 doc="""
326 The durable property indicates that the message should be held durably
327 by any intermediaries taking responsibility for the message.
328 """)
329
331 return pn_message_get_priority(self._msg)
332
334 self._check(pn_message_set_priority(self._msg, value))
335
336 priority = property(_get_priority, _set_priority,
337 doc="""
338 The priority of the message.
339 """)
340
342 return millis2secs(pn_message_get_ttl(self._msg))
343
345 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
346
347 ttl = property(_get_ttl, _set_ttl,
348 doc="""
349 The time to live of the message measured in seconds. Expired messages
350 may be dropped.
351 """)
352
354 return pn_message_is_first_acquirer(self._msg)
355
357 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
358
359 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
360 doc="""
361 True iff the recipient is the first to acquire the message.
362 """)
363
365 return pn_message_get_delivery_count(self._msg)
366
368 self._check(pn_message_set_delivery_count(self._msg, value))
369
370 delivery_count = property(_get_delivery_count, _set_delivery_count,
371 doc="""
372 The number of delivery attempts made for this message.
373 """)
374
375
383 id = property(_get_id, _set_id,
384 doc="""
385 The id of the message.
386 """)
387
389 return pn_message_get_user_id(self._msg)
390
392 self._check(pn_message_set_user_id(self._msg, value))
393
394 user_id = property(_get_user_id, _set_user_id,
395 doc="""
396 The user id of the message creator.
397 """)
398
400 return utf82unicode(pn_message_get_address(self._msg))
401
403 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
404
405 address = property(_get_address, _set_address,
406 doc="""
407 The address of the message.
408 """)
409
411 return utf82unicode(pn_message_get_subject(self._msg))
412
414 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
415
416 subject = property(_get_subject, _set_subject,
417 doc="""
418 The subject of the message.
419 """)
420
422 return utf82unicode(pn_message_get_reply_to(self._msg))
423
425 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
426
427 reply_to = property(_get_reply_to, _set_reply_to,
428 doc="""
429 The reply-to address for the message.
430 """)
431
439
440 correlation_id = property(_get_correlation_id, _set_correlation_id,
441 doc="""
442 The correlation-id for the message.
443 """)
444
446 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
447
448 - def _set_content_type(self, value):
449 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
450
451 content_type = property(_get_content_type, _set_content_type,
452 doc="""
453 The content-type of the message.
454 """)
455
457 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
458
459 - def _set_content_encoding(self, value):
460 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
461
462 content_encoding = property(_get_content_encoding, _set_content_encoding,
463 doc="""
464 The content-encoding of the message.
465 """)
466
468 return millis2secs(pn_message_get_expiry_time(self._msg))
469
471 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
472
473 expiry_time = property(_get_expiry_time, _set_expiry_time,
474 doc="""
475 The expiry time of the message.
476 """)
477
479 return millis2secs(pn_message_get_creation_time(self._msg))
480
482 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
483
484 creation_time = property(_get_creation_time, _set_creation_time,
485 doc="""
486 The creation time of the message.
487 """)
488
490 return utf82unicode(pn_message_get_group_id(self._msg))
491
493 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
494
495 group_id = property(_get_group_id, _set_group_id,
496 doc="""
497 The group id of the message.
498 """)
499
501 return pn_message_get_group_sequence(self._msg)
502
504 self._check(pn_message_set_group_sequence(self._msg, value))
505
506 group_sequence = property(_get_group_sequence, _set_group_sequence,
507 doc="""
508 The sequence of the message within its group.
509 """)
510
512 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
513
515 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
516
517 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
518 doc="""
519 The group-id for any replies.
520 """)
521
523 self._pre_encode()
524 sz = 16
525 while True:
526 err, data = pn_message_encode(self._msg, sz)
527 if err == PN_OVERFLOW:
528 sz *= 2
529 continue
530 else:
531 self._check(err)
532 return data
533
535 self._check(pn_message_decode(self._msg, data))
536 self._post_decode()
537
538 - def send(self, sender, tag=None):
546
547 - def recv(self, link):
548 """
549 Receives and decodes the message content for the current delivery
550 from the link. Upon success it will return the current delivery
551 for the link. If there is no current delivery, or if the current
552 delivery is incomplete, or if the link is not a receiver, it will
553 return None.
554
555 @type link: Link
556 @param link: the link to receive a message from
557 @return the delivery associated with the decoded message (or None)
558
559 """
560 if link.is_sender: return None
561 dlv = link.current
562 if not dlv or dlv.partial: return None
563 dlv.encoded = link.recv(dlv.pending)
564 link.advance()
565
566
567 if link.remote_snd_settle_mode == Link.SND_SETTLED:
568 dlv.settle()
569 self.decode(dlv.encoded)
570 return dlv
571
573 props = []
574 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
575 "priority", "first_acquirer", "delivery_count", "id",
576 "correlation_id", "user_id", "group_id", "group_sequence",
577 "reply_to_group_id", "instructions", "annotations",
578 "properties", "body"):
579 value = getattr(self, attr)
580 if value: props.append("%s=%r" % (attr, value))
581 return "Message(%s)" % ", ".join(props)
582
584 tmp = pn_string(None)
585 err = pn_inspect(self._msg, tmp)
586 result = pn_string_get(tmp)
587 pn_free(tmp)
588 self._check(err)
589 return result
590
591 _DEFAULT = object()
594
595 @staticmethod
597 if impl is None:
598 return None
599 else:
600 return Selectable(impl)
601
604
607
609 if fd is _DEFAULT:
610 return pn_selectable_get_fd(self._impl)
611 elif fd is None:
612 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
613 else:
614 pn_selectable_set_fd(self._impl, fd)
615
617 return pn_selectable_is_reading(self._impl)
618
620 pn_selectable_set_reading(self._impl, bool(val))
621
622 reading = property(_is_reading, _set_reading)
623
625 return pn_selectable_is_writing(self._impl)
626
628 pn_selectable_set_writing(self._impl, bool(val))
629
630 writing = property(_is_writing, _set_writing)
631
633 tstamp = pn_selectable_get_deadline(self._impl)
634 if tstamp:
635 return millis2secs(tstamp)
636 else:
637 return None
638
640 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
641
642 deadline = property(_get_deadline, _set_deadline)
643
645 pn_selectable_readable(self._impl)
646
648 pn_selectable_writable(self._impl)
649
651 pn_selectable_expired(self._impl)
652
654 return pn_selectable_is_registered(self._impl)
655
657 pn_selectable_set_registered(self._impl, registered)
658
659 registered = property(_is_registered, _set_registered,
660 doc="""
661 The registered property may be get/set by an I/O polling system to
662 indicate whether the fd has been registered or not.
663 """)
664
665 @property
667 return pn_selectable_is_terminal(self._impl)
668
670 pn_selectable_terminate(self._impl)
671
673 pn_selectable_release(self._impl)
674
676 """
677 The DataException class is the root of the Data exception hierarchy.
678 All exceptions raised by the Data class extend this exception.
679 """
680 pass
681
683
686
688 return "UnmappedType(%s)" % self.msg
689
691
693 return "ulong(%s)" % long.__repr__(self)
694
696
698 return "timestamp(%s)" % long.__repr__(self)
699
701
703 return "symbol(%s)" % unicode.__repr__(self)
704
705 -class char(unicode):
706
708 return "char(%s)" % unicode.__repr__(self)
709
711
713 return "byte(%s)" % int.__repr__(self)
714
716
718 return "short(%s)" % int.__repr__(self)
719
721
723 return "int32(%s)" % int.__repr__(self)
724
726
728 return "ubyte(%s)" % int.__repr__(self)
729
731
733 return "ushort(%s)" % int.__repr__(self)
734
736
738 return "uint(%s)" % long.__repr__(self)
739
741
743 return "float32(%s)" % float.__repr__(self)
744
746
748 return "decimal32(%s)" % int.__repr__(self)
749
751
753 return "decimal64(%s)" % long.__repr__(self)
754
756
758 return "decimal128(%s)" % bytes.__repr__(self)
759
761
763 self.descriptor = descriptor
764 self.value = value
765
767 return "Described(%r, %r)" % (self.descriptor, self.value)
768
770 if isinstance(o, Described):
771 return self.descriptor == o.descriptor and self.value == o.value
772 else:
773 return False
774
775 UNDESCRIBED = Constant("UNDESCRIBED")
778
779 - def __init__(self, descriptor, type, *elements):
780 self.descriptor = descriptor
781 self.type = type
782 self.elements = elements
783
785 return iter(self.elements)
786
788 if self.elements:
789 els = ", %s" % (", ".join(map(repr, self.elements)))
790 else:
791 els = ""
792 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
793
795 if isinstance(o, Array):
796 return self.descriptor == o.descriptor and \
797 self.type == o.type and self.elements == o.elements
798 else:
799 return False
800
802 """
803 The L{Data} class provides an interface for decoding, extracting,
804 creating, and encoding arbitrary AMQP data. A L{Data} object
805 contains a tree of AMQP values. Leaf nodes in this tree correspond
806 to scalars in the AMQP type system such as L{ints<INT>} or
807 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
808 compound values in the AMQP type system such as L{lists<LIST>},
809 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
810 The root node of the tree is the L{Data} object itself and can have
811 an arbitrary number of children.
812
813 A L{Data} object maintains the notion of the current sibling node
814 and a current parent node. Siblings are ordered within their parent.
815 Values are accessed and/or added by using the L{next}, L{prev},
816 L{enter}, and L{exit} methods to navigate to the desired location in
817 the tree and using the supplied variety of put_*/get_* methods to
818 access or add a value of the desired type.
819
820 The put_* methods will always add a value I{after} the current node
821 in the tree. If the current node has a next sibling the put_* method
822 will overwrite the value on this node. If there is no current node
823 or the current node has no next sibling then one will be added. The
824 put_* methods always set the added/modified node to the current
825 node. The get_* methods read the value of the current node and do
826 not change which node is current.
827
828 The following types of scalar values are supported:
829
830 - L{NULL}
831 - L{BOOL}
832 - L{UBYTE}
833 - L{USHORT}
834 - L{SHORT}
835 - L{UINT}
836 - L{INT}
837 - L{ULONG}
838 - L{LONG}
839 - L{FLOAT}
840 - L{DOUBLE}
841 - L{BINARY}
842 - L{STRING}
843 - L{SYMBOL}
844
845 The following types of compound values are supported:
846
847 - L{DESCRIBED}
848 - L{ARRAY}
849 - L{LIST}
850 - L{MAP}
851 """
852
853 NULL = PN_NULL; "A null value."
854 BOOL = PN_BOOL; "A boolean value."
855 UBYTE = PN_UBYTE; "An unsigned byte value."
856 BYTE = PN_BYTE; "A signed byte value."
857 USHORT = PN_USHORT; "An unsigned short value."
858 SHORT = PN_SHORT; "A short value."
859 UINT = PN_UINT; "An unsigned int value."
860 INT = PN_INT; "A signed int value."
861 CHAR = PN_CHAR; "A character value."
862 ULONG = PN_ULONG; "An unsigned long value."
863 LONG = PN_LONG; "A signed long value."
864 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
865 FLOAT = PN_FLOAT; "A float value."
866 DOUBLE = PN_DOUBLE; "A double value."
867 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
868 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
869 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
870 UUID = PN_UUID; "A UUID value."
871 BINARY = PN_BINARY; "A binary string."
872 STRING = PN_STRING; "A unicode string."
873 SYMBOL = PN_SYMBOL; "A symbolic string."
874 DESCRIBED = PN_DESCRIBED; "A described value."
875 ARRAY = PN_ARRAY; "An array value."
876 LIST = PN_LIST; "A list value."
877 MAP = PN_MAP; "A map value."
878
879 type_names = {
880 NULL: "null",
881 BOOL: "bool",
882 BYTE: "byte",
883 UBYTE: "ubyte",
884 SHORT: "short",
885 USHORT: "ushort",
886 INT: "int",
887 UINT: "uint",
888 CHAR: "char",
889 LONG: "long",
890 ULONG: "ulong",
891 TIMESTAMP: "timestamp",
892 FLOAT: "float",
893 DOUBLE: "double",
894 DECIMAL32: "decimal32",
895 DECIMAL64: "decimal64",
896 DECIMAL128: "decimal128",
897 UUID: "uuid",
898 BINARY: "binary",
899 STRING: "string",
900 SYMBOL: "symbol",
901 DESCRIBED: "described",
902 ARRAY: "array",
903 LIST: "list",
904 MAP: "map"
905 }
906
907 @classmethod
909
917
919 if self._free and hasattr(self, "_data"):
920 pn_data_free(self._data)
921 del self._data
922
924 if err < 0:
925 exc = EXCEPTIONS.get(err, DataException)
926 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
927 else:
928 return err
929
931 """
932 Clears the data object.
933 """
934 pn_data_clear(self._data)
935
937 """
938 Clears current node and sets the parent to the root node. Clearing the
939 current node sets it _before_ the first node, calling next() will advance to
940 the first node.
941 """
942 assert self._data is not None
943 pn_data_rewind(self._data)
944
946 """
947 Advances the current node to its next sibling and returns its
948 type. If there is no next sibling the current node remains
949 unchanged and None is returned.
950 """
951 found = pn_data_next(self._data)
952 if found:
953 return self.type()
954 else:
955 return None
956
958 """
959 Advances the current node to its previous sibling and returns its
960 type. If there is no previous sibling the current node remains
961 unchanged and None is returned.
962 """
963 found = pn_data_prev(self._data)
964 if found:
965 return self.type()
966 else:
967 return None
968
970 """
971 Sets the parent node to the current node and clears the current node.
972 Clearing the current node sets it _before_ the first child,
973 call next() advances to the first child.
974 """
975 return pn_data_enter(self._data)
976
978 """
979 Sets the current node to the parent node and the parent node to
980 its own parent.
981 """
982 return pn_data_exit(self._data)
983
985 return pn_data_lookup(self._data, name)
986
988 pn_data_narrow(self._data)
989
991 pn_data_widen(self._data)
992
994 """
995 Returns the type of the current node.
996 """
997 dtype = pn_data_type(self._data)
998 if dtype == -1:
999 return None
1000 else:
1001 return dtype
1002
1004 """
1005 Returns the size in bytes needed to encode the data in AMQP format.
1006 """
1007 return pn_data_encoded_size(self._data)
1008
1010 """
1011 Returns a representation of the data encoded in AMQP format.
1012 """
1013 size = 1024
1014 while True:
1015 cd, enc = pn_data_encode(self._data, size)
1016 if cd == PN_OVERFLOW:
1017 size *= 2
1018 elif cd >= 0:
1019 return enc
1020 else:
1021 self._check(cd)
1022
1024 """
1025 Decodes the first value from supplied AMQP data and returns the
1026 number of bytes consumed.
1027
1028 @type encoded: binary
1029 @param encoded: AMQP encoded binary data
1030 """
1031 return self._check(pn_data_decode(self._data, encoded))
1032
1034 """
1035 Puts a list value. Elements may be filled by entering the list
1036 node and putting element values.
1037
1038 >>> data = Data()
1039 >>> data.put_list()
1040 >>> data.enter()
1041 >>> data.put_int(1)
1042 >>> data.put_int(2)
1043 >>> data.put_int(3)
1044 >>> data.exit()
1045 """
1046 self._check(pn_data_put_list(self._data))
1047
1049 """
1050 Puts a map value. Elements may be filled by entering the map node
1051 and putting alternating key value pairs.
1052
1053 >>> data = Data()
1054 >>> data.put_map()
1055 >>> data.enter()
1056 >>> data.put_string("key")
1057 >>> data.put_string("value")
1058 >>> data.exit()
1059 """
1060 self._check(pn_data_put_map(self._data))
1061
1062 - def put_array(self, described, element_type):
1063 """
1064 Puts an array value. Elements may be filled by entering the array
1065 node and putting the element values. The values must all be of the
1066 specified array element type. If an array is described then the
1067 first child value of the array is the descriptor and may be of any
1068 type.
1069
1070 >>> data = Data()
1071 >>>
1072 >>> data.put_array(False, Data.INT)
1073 >>> data.enter()
1074 >>> data.put_int(1)
1075 >>> data.put_int(2)
1076 >>> data.put_int(3)
1077 >>> data.exit()
1078 >>>
1079 >>> data.put_array(True, Data.DOUBLE)
1080 >>> data.enter()
1081 >>> data.put_symbol("array-descriptor")
1082 >>> data.put_double(1.1)
1083 >>> data.put_double(1.2)
1084 >>> data.put_double(1.3)
1085 >>> data.exit()
1086
1087 @type described: bool
1088 @param described: specifies whether the array is described
1089 @type element_type: int
1090 @param element_type: the type of the array elements
1091 """
1092 self._check(pn_data_put_array(self._data, described, element_type))
1093
1095 """
1096 Puts a described value. A described node has two children, the
1097 descriptor and the value. These are specified by entering the node
1098 and putting the desired values.
1099
1100 >>> data = Data()
1101 >>> data.put_described()
1102 >>> data.enter()
1103 >>> data.put_symbol("value-descriptor")
1104 >>> data.put_string("the value")
1105 >>> data.exit()
1106 """
1107 self._check(pn_data_put_described(self._data))
1108
1110 """
1111 Puts a null value.
1112 """
1113 self._check(pn_data_put_null(self._data))
1114
1116 """
1117 Puts a boolean value.
1118
1119 @param b: a boolean value
1120 """
1121 self._check(pn_data_put_bool(self._data, b))
1122
1124 """
1125 Puts an unsigned byte value.
1126
1127 @param ub: an integral value
1128 """
1129 self._check(pn_data_put_ubyte(self._data, ub))
1130
1132 """
1133 Puts a signed byte value.
1134
1135 @param b: an integral value
1136 """
1137 self._check(pn_data_put_byte(self._data, b))
1138
1140 """
1141 Puts an unsigned short value.
1142
1143 @param us: an integral value.
1144 """
1145 self._check(pn_data_put_ushort(self._data, us))
1146
1148 """
1149 Puts a signed short value.
1150
1151 @param s: an integral value
1152 """
1153 self._check(pn_data_put_short(self._data, s))
1154
1156 """
1157 Puts an unsigned int value.
1158
1159 @param ui: an integral value
1160 """
1161 self._check(pn_data_put_uint(self._data, ui))
1162
1164 """
1165 Puts a signed int value.
1166
1167 @param i: an integral value
1168 """
1169 self._check(pn_data_put_int(self._data, i))
1170
1172 """
1173 Puts a char value.
1174
1175 @param c: a single character
1176 """
1177 self._check(pn_data_put_char(self._data, ord(c)))
1178
1180 """
1181 Puts an unsigned long value.
1182
1183 @param ul: an integral value
1184 """
1185 self._check(pn_data_put_ulong(self._data, ul))
1186
1188 """
1189 Puts a signed long value.
1190
1191 @param l: an integral value
1192 """
1193 self._check(pn_data_put_long(self._data, l))
1194
1196 """
1197 Puts a timestamp value.
1198
1199 @param t: an integral value
1200 """
1201 self._check(pn_data_put_timestamp(self._data, t))
1202
1204 """
1205 Puts a float value.
1206
1207 @param f: a floating point value
1208 """
1209 self._check(pn_data_put_float(self._data, f))
1210
1212 """
1213 Puts a double value.
1214
1215 @param d: a floating point value.
1216 """
1217 self._check(pn_data_put_double(self._data, d))
1218
1220 """
1221 Puts a decimal32 value.
1222
1223 @param d: a decimal32 value
1224 """
1225 self._check(pn_data_put_decimal32(self._data, d))
1226
1228 """
1229 Puts a decimal64 value.
1230
1231 @param d: a decimal64 value
1232 """
1233 self._check(pn_data_put_decimal64(self._data, d))
1234
1236 """
1237 Puts a decimal128 value.
1238
1239 @param d: a decimal128 value
1240 """
1241 self._check(pn_data_put_decimal128(self._data, d))
1242
1244 """
1245 Puts a UUID value.
1246
1247 @param u: a uuid value
1248 """
1249 self._check(pn_data_put_uuid(self._data, u.bytes))
1250
1252 """
1253 Puts a binary value.
1254
1255 @type b: binary
1256 @param b: a binary value
1257 """
1258 self._check(pn_data_put_binary(self._data, b))
1259
1261 """Put a python memoryview object as an AMQP binary value"""
1262 self.put_binary(mv.tobytes())
1263
1265 """Put a python buffer object as an AMQP binary value"""
1266 self.put_binary(bytes(buff))
1267
1269 """
1270 Puts a unicode value.
1271
1272 @type s: unicode
1273 @param s: a unicode value
1274 """
1275 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1276
1278 """
1279 Puts a symbolic value.
1280
1281 @type s: string
1282 @param s: the symbol name
1283 """
1284 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1285
1287 """
1288 If the current node is a list, return the number of elements,
1289 otherwise return zero. List elements can be accessed by entering
1290 the list.
1291
1292 >>> count = data.get_list()
1293 >>> data.enter()
1294 >>> for i in range(count):
1295 ... type = data.next()
1296 ... if type == Data.STRING:
1297 ... print data.get_string()
1298 ... elif type == ...:
1299 ... ...
1300 >>> data.exit()
1301 """
1302 return pn_data_get_list(self._data)
1303
1305 """
1306 If the current node is a map, return the number of child elements,
1307 otherwise return zero. Key value pairs can be accessed by entering
1308 the map.
1309
1310 >>> count = data.get_map()
1311 >>> data.enter()
1312 >>> for i in range(count/2):
1313 ... type = data.next()
1314 ... if type == Data.STRING:
1315 ... print data.get_string()
1316 ... elif type == ...:
1317 ... ...
1318 >>> data.exit()
1319 """
1320 return pn_data_get_map(self._data)
1321
1323 """
1324 If the current node is an array, return a tuple of the element
1325 count, a boolean indicating whether the array is described, and
1326 the type of each element, otherwise return (0, False, None). Array
1327 data can be accessed by entering the array.
1328
1329 >>> # read an array of strings with a symbolic descriptor
1330 >>> count, described, type = data.get_array()
1331 >>> data.enter()
1332 >>> data.next()
1333 >>> print "Descriptor:", data.get_symbol()
1334 >>> for i in range(count):
1335 ... data.next()
1336 ... print "Element:", data.get_string()
1337 >>> data.exit()
1338 """
1339 count = pn_data_get_array(self._data)
1340 described = pn_data_is_array_described(self._data)
1341 type = pn_data_get_array_type(self._data)
1342 if type == -1:
1343 type = None
1344 return count, described, type
1345
1347 """
1348 Checks if the current node is a described value. The descriptor
1349 and value may be accessed by entering the described value.
1350
1351 >>> # read a symbolically described string
1352 >>> assert data.is_described() # will error if the current node is not described
1353 >>> data.enter()
1354 >>> data.next()
1355 >>> print data.get_symbol()
1356 >>> data.next()
1357 >>> print data.get_string()
1358 >>> data.exit()
1359 """
1360 return pn_data_is_described(self._data)
1361
1363 """
1364 Checks if the current node is a null.
1365 """
1366 return pn_data_is_null(self._data)
1367
1369 """
1370 If the current node is a boolean, returns its value, returns False
1371 otherwise.
1372 """
1373 return pn_data_get_bool(self._data)
1374
1376 """
1377 If the current node is an unsigned byte, returns its value,
1378 returns 0 otherwise.
1379 """
1380 return ubyte(pn_data_get_ubyte(self._data))
1381
1383 """
1384 If the current node is a signed byte, returns its value, returns 0
1385 otherwise.
1386 """
1387 return byte(pn_data_get_byte(self._data))
1388
1390 """
1391 If the current node is an unsigned short, returns its value,
1392 returns 0 otherwise.
1393 """
1394 return ushort(pn_data_get_ushort(self._data))
1395
1397 """
1398 If the current node is a signed short, returns its value, returns
1399 0 otherwise.
1400 """
1401 return short(pn_data_get_short(self._data))
1402
1404 """
1405 If the current node is an unsigned int, returns its value, returns
1406 0 otherwise.
1407 """
1408 return uint(pn_data_get_uint(self._data))
1409
1411 """
1412 If the current node is a signed int, returns its value, returns 0
1413 otherwise.
1414 """
1415 return int32(pn_data_get_int(self._data))
1416
1418 """
1419 If the current node is a char, returns its value, returns 0
1420 otherwise.
1421 """
1422 return char(_compat.unichar(pn_data_get_char(self._data)))
1423
1425 """
1426 If the current node is an unsigned long, returns its value,
1427 returns 0 otherwise.
1428 """
1429 return ulong(pn_data_get_ulong(self._data))
1430
1432 """
1433 If the current node is an signed long, returns its value, returns
1434 0 otherwise.
1435 """
1436 return long(pn_data_get_long(self._data))
1437
1439 """
1440 If the current node is a timestamp, returns its value, returns 0
1441 otherwise.
1442 """
1443 return timestamp(pn_data_get_timestamp(self._data))
1444
1446 """
1447 If the current node is a float, returns its value, raises 0
1448 otherwise.
1449 """
1450 return float32(pn_data_get_float(self._data))
1451
1453 """
1454 If the current node is a double, returns its value, returns 0
1455 otherwise.
1456 """
1457 return pn_data_get_double(self._data)
1458
1459
1461 """
1462 If the current node is a decimal32, returns its value, returns 0
1463 otherwise.
1464 """
1465 return decimal32(pn_data_get_decimal32(self._data))
1466
1467
1469 """
1470 If the current node is a decimal64, returns its value, returns 0
1471 otherwise.
1472 """
1473 return decimal64(pn_data_get_decimal64(self._data))
1474
1475
1477 """
1478 If the current node is a decimal128, returns its value, returns 0
1479 otherwise.
1480 """
1481 return decimal128(pn_data_get_decimal128(self._data))
1482
1484 """
1485 If the current node is a UUID, returns its value, returns None
1486 otherwise.
1487 """
1488 if pn_data_type(self._data) == Data.UUID:
1489 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
1490 else:
1491 return None
1492
1494 """
1495 If the current node is binary, returns its value, returns ""
1496 otherwise.
1497 """
1498 return pn_data_get_binary(self._data)
1499
1501 """
1502 If the current node is a string, returns its value, returns ""
1503 otherwise.
1504 """
1505 return pn_data_get_string(self._data).decode("utf8")
1506
1508 """
1509 If the current node is a symbol, returns its value, returns ""
1510 otherwise.
1511 """
1512 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
1513
1514 - def copy(self, src):
1515 self._check(pn_data_copy(self._data, src._data))
1516
1527
1529 pn_data_dump(self._data)
1530
1540
1542 if self.enter():
1543 try:
1544 result = {}
1545 while self.next():
1546 k = self.get_object()
1547 if self.next():
1548 v = self.get_object()
1549 else:
1550 v = None
1551 result[k] = v
1552 finally:
1553 self.exit()
1554 return result
1555
1564
1566 if self.enter():
1567 try:
1568 result = []
1569 while self.next():
1570 result.append(self.get_object())
1571 finally:
1572 self.exit()
1573 return result
1574
1585
1594
1596 """
1597 If the current node is an array, return an Array object
1598 representing the array and its contents. Otherwise return None.
1599 This is a convenience wrapper around get_array, enter, etc.
1600 """
1601
1602 count, described, type = self.get_array()
1603 if type is None: return None
1604 if self.enter():
1605 try:
1606 if described:
1607 self.next()
1608 descriptor = self.get_object()
1609 else:
1610 descriptor = UNDESCRIBED
1611 elements = []
1612 while self.next():
1613 elements.append(self.get_object())
1614 finally:
1615 self.exit()
1616 return Array(descriptor, type, *elements)
1617
1629
1630 put_mappings = {
1631 None.__class__: lambda s, _: s.put_null(),
1632 bool: put_bool,
1633 ubyte: put_ubyte,
1634 ushort: put_ushort,
1635 uint: put_uint,
1636 ulong: put_ulong,
1637 byte: put_byte,
1638 short: put_short,
1639 int32: put_int,
1640 long: put_long,
1641 float32: put_float,
1642 float: put_double,
1643 decimal32: put_decimal32,
1644 decimal64: put_decimal64,
1645 decimal128: put_decimal128,
1646 char: put_char,
1647 timestamp: put_timestamp,
1648 uuid.UUID: put_uuid,
1649 bytes: put_binary,
1650 unicode: put_string,
1651 symbol: put_symbol,
1652 list: put_sequence,
1653 tuple: put_sequence,
1654 dict: put_dict,
1655 Described: put_py_described,
1656 Array: put_py_array
1657 }
1658
1659
1660 if int not in put_mappings:
1661 put_mappings[int] = put_int
1662
1663 try: put_mappings[memoryview] = put_memoryview
1664 except NameError: pass
1665 try: put_mappings[buffer] = put_buffer
1666 except NameError: pass
1667 get_mappings = {
1668 NULL: lambda s: None,
1669 BOOL: get_bool,
1670 BYTE: get_byte,
1671 UBYTE: get_ubyte,
1672 SHORT: get_short,
1673 USHORT: get_ushort,
1674 INT: get_int,
1675 UINT: get_uint,
1676 CHAR: get_char,
1677 LONG: get_long,
1678 ULONG: get_ulong,
1679 TIMESTAMP: get_timestamp,
1680 FLOAT: get_float,
1681 DOUBLE: get_double,
1682 DECIMAL32: get_decimal32,
1683 DECIMAL64: get_decimal64,
1684 DECIMAL128: get_decimal128,
1685 UUID: get_uuid,
1686 BINARY: get_binary,
1687 STRING: get_string,
1688 SYMBOL: get_symbol,
1689 DESCRIBED: get_py_described,
1690 ARRAY: get_py_array,
1691 LIST: get_sequence,
1692 MAP: get_dict
1693 }
1694
1695
1697 putter = self.put_mappings[obj.__class__]
1698 putter(self, obj)
1699
1701 type = self.type()
1702 if type is None: return None
1703 getter = self.get_mappings.get(type)
1704 if getter:
1705 return getter(self)
1706 else:
1707 return UnmappedType(str(type))
1708
1711
1765
1767
1768 - def __init__(self, name, description=None, info=None):
1769 self.name = name
1770 self.description = description
1771 self.info = info
1772
1774 return "Condition(%s)" % ", ".join([repr(x) for x in
1775 (self.name, self.description, self.info)
1776 if x])
1777
1779 if not isinstance(o, Condition): return False
1780 return self.name == o.name and \
1781 self.description == o.description and \
1782 self.info == o.info
1783
1785 pn_condition_clear(cond)
1786 if obj:
1787 pn_condition_set_name(cond, str(obj.name))
1788 pn_condition_set_description(cond, obj.description)
1789 info = Data(pn_condition_info(cond))
1790 if obj.info:
1791 info.put_object(obj.info)
1792
1794 if pn_condition_is_set(cond):
1795 return Condition(pn_condition_get_name(cond),
1796 pn_condition_get_description(cond),
1797 dat2obj(pn_condition_info(cond)))
1798 else:
1799 return None
1800
1809
1814
1816 return long(secs*1000)
1817
1819 return float(millis)/1000.0
1820
1822 if secs is None: return PN_MILLIS_MAX
1823 return secs2millis(secs)
1824
1826 if millis == PN_MILLIS_MAX: return None
1827 return millis2secs(millis)
1828
1830 """Some Proton APIs expect a null terminated string. Convert python text
1831 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
1832 This method will throw if the string cannot be converted.
1833 """
1834 if string is None:
1835 return None
1836 if _compat.IS_PY2:
1837 if isinstance(string, unicode):
1838 return string.encode('utf-8')
1839 elif isinstance(string, str):
1840 return string
1841 else:
1842
1843 if isinstance(string, str):
1844 string = string.encode('utf-8')
1845
1846 if isinstance(string, bytes):
1847 return string.decode('utf-8')
1848 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
1849
1851 """Covert C strings returned from proton-c into python unicode"""
1852 if string is None:
1853 return None
1854 if isinstance(string, _compat.TEXT_TYPES):
1855
1856 return string
1857 elif isinstance(string, _compat.BINARY_TYPES):
1858 return string.decode('utf8')
1859 else:
1860 raise TypeError("Unrecognized string type")
1861
1863 """
1864 A representation of an AMQP connection
1865 """
1866
1867 @staticmethod
1869 if impl is None:
1870 return None
1871 else:
1872 return Connection(impl)
1873
1874 - def __init__(self, impl = pn_connection):
1876
1878 Endpoint._init(self)
1879 self.offered_capabilities = None
1880 self.desired_capabilities = None
1881 self.properties = None
1882
1884 return pn_connection_attachments(self._impl)
1885
1886 @property
1889
1890 @property
1893
1895 if err < 0:
1896 exc = EXCEPTIONS.get(err, ConnectionException)
1897 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
1898 else:
1899 return err
1900
1902 return pn_connection_condition(self._impl)
1903
1905 return pn_connection_remote_condition(self._impl)
1906
1908 if collector is None:
1909 pn_connection_collect(self._impl, None)
1910 else:
1911 pn_connection_collect(self._impl, collector._impl)
1912 self._collector = weakref.ref(collector)
1913
1915 return utf82unicode(pn_connection_get_container(self._impl))
1917 return pn_connection_set_container(self._impl, unicode2utf8(name))
1918
1919 container = property(_get_container, _set_container)
1920
1922 return utf82unicode(pn_connection_get_hostname(self._impl))
1924 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
1925
1926 hostname = property(_get_hostname, _set_hostname,
1927 doc="""
1928 Set the name of the host (either fully qualified or relative) to which this
1929 connection is connecting to. This information may be used by the remote
1930 peer to determine the correct back-end service to connect the client to.
1931 This value will be sent in the Open performative, and will be used by SSL
1932 and SASL layers to identify the peer.
1933 """)
1934
1936 return utf82unicode(pn_connection_get_user(self._impl))
1938 return pn_connection_set_user(self._impl, unicode2utf8(name))
1939
1940 user = property(_get_user, _set_user)
1941
1945 return pn_connection_set_password(self._impl, unicode2utf8(name))
1946
1947 password = property(_get_password, _set_password)
1948
1949 @property
1951 """The container identifier specified by the remote peer for this connection."""
1952 return pn_connection_remote_container(self._impl)
1953
1954 @property
1956 """The hostname specified by the remote peer for this connection."""
1957 return pn_connection_remote_hostname(self._impl)
1958
1959 @property
1961 """The capabilities offered by the remote peer for this connection."""
1962 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
1963
1964 @property
1966 """The capabilities desired by the remote peer for this connection."""
1967 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
1968
1969 @property
1971 """The properties specified by the remote peer for this connection."""
1972 return dat2obj(pn_connection_remote_properties(self._impl))
1973
1975 """
1976 Opens the connection.
1977
1978 In more detail, this moves the local state of the connection to
1979 the ACTIVE state and triggers an open frame to be sent to the
1980 peer. A connection is fully active once both peers have opened it.
1981 """
1982 obj2dat(self.offered_capabilities,
1983 pn_connection_offered_capabilities(self._impl))
1984 obj2dat(self.desired_capabilities,
1985 pn_connection_desired_capabilities(self._impl))
1986 obj2dat(self.properties, pn_connection_properties(self._impl))
1987 pn_connection_open(self._impl)
1988
1990 """
1991 Closes the connection.
1992
1993 In more detail, this moves the local state of the connection to
1994 the CLOSED state and triggers a close frame to be sent to the
1995 peer. A connection is fully closed once both peers have closed it.
1996 """
1997 self._update_cond()
1998 pn_connection_close(self._impl)
1999 if hasattr(self, '_session_policy'):
2000
2001 del self._session_policy
2002
2003 @property
2005 """
2006 The state of the connection as a bit field. The state has a local
2007 and a remote component. Each of these can be in one of three
2008 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2009 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2010 REMOTE_ACTIVE and REMOTE_CLOSED.
2011 """
2012 return pn_connection_state(self._impl)
2013
2015 """
2016 Returns a new session on this connection.
2017 """
2018 ssn = pn_session(self._impl)
2019 if ssn is None:
2020 raise(SessionException("Session allocation failed."))
2021 else:
2022 return Session(ssn)
2023
2025 return Session.wrap(pn_session_head(self._impl, mask))
2026
2028 return Link.wrap(pn_link_head(self._impl, mask))
2029
2030 @property
2033
2034 @property
2036 return pn_error_code(pn_connection_error(self._impl))
2037
2039 pn_connection_release(self._impl)
2040
2043
2045
2046 @staticmethod
2048 if impl is None:
2049 return None
2050 else:
2051 return Session(impl)
2052
2055
2057 return pn_session_attachments(self._impl)
2058
2060 return pn_session_condition(self._impl)
2061
2063 return pn_session_remote_condition(self._impl)
2064
2066 return pn_session_get_incoming_capacity(self._impl)
2067
2069 pn_session_set_incoming_capacity(self._impl, capacity)
2070
2071 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2072
2074 return pn_session_get_outgoing_window(self._impl)
2075
2077 pn_session_set_outgoing_window(self._impl, window)
2078
2079 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
2080
2081 @property
2083 return pn_session_outgoing_bytes(self._impl)
2084
2085 @property
2087 return pn_session_incoming_bytes(self._impl)
2088
2090 pn_session_open(self._impl)
2091
2093 self._update_cond()
2094 pn_session_close(self._impl)
2095
2096 - def next(self, mask):
2097 return Session.wrap(pn_session_next(self._impl, mask))
2098
2099 @property
2101 return pn_session_state(self._impl)
2102
2103 @property
2106
2108 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2109
2111 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2112
2114 pn_session_free(self._impl)
2115
2118
2119 -class Link(Wrapper, Endpoint):
2120 """
2121 A representation of an AMQP link, of which there are two concrete
2122 implementations, Sender and Receiver.
2123 """
2124
2125 SND_UNSETTLED = PN_SND_UNSETTLED
2126 SND_SETTLED = PN_SND_SETTLED
2127 SND_MIXED = PN_SND_MIXED
2128
2129 RCV_FIRST = PN_RCV_FIRST
2130 RCV_SECOND = PN_RCV_SECOND
2131
2132 @staticmethod
2134 if impl is None: return None
2135 if pn_link_is_sender(impl):
2136 return Sender(impl)
2137 else:
2138 return Receiver(impl)
2139
2142
2144 return pn_link_attachments(self._impl)
2145
2147 if err < 0:
2148 exc = EXCEPTIONS.get(err, LinkException)
2149 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2150 else:
2151 return err
2152
2154 return pn_link_condition(self._impl)
2155
2157 return pn_link_remote_condition(self._impl)
2158
2160 """
2161 Opens the link.
2162
2163 In more detail, this moves the local state of the link to the
2164 ACTIVE state and triggers an attach frame to be sent to the
2165 peer. A link is fully active once both peers have attached it.
2166 """
2167 pn_link_open(self._impl)
2168
2170 """
2171 Closes the link.
2172
2173 In more detail, this moves the local state of the link to the
2174 CLOSED state and triggers an detach frame (with the closed flag
2175 set) to be sent to the peer. A link is fully closed once both
2176 peers have detached it.
2177 """
2178 self._update_cond()
2179 pn_link_close(self._impl)
2180
2181 @property
2183 """
2184 The state of the link as a bit field. The state has a local
2185 and a remote component. Each of these can be in one of three
2186 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2187 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2188 REMOTE_ACTIVE and REMOTE_CLOSED.
2189 """
2190 return pn_link_state(self._impl)
2191
2192 @property
2194 """The source of the link as described by the local peer."""
2195 return Terminus(pn_link_source(self._impl))
2196
2197 @property
2199 """The target of the link as described by the local peer."""
2200 return Terminus(pn_link_target(self._impl))
2201
2202 @property
2204 """The source of the link as described by the remote peer."""
2205 return Terminus(pn_link_remote_source(self._impl))
2206 @property
2208 """The target of the link as described by the remote peer."""
2209 return Terminus(pn_link_remote_target(self._impl))
2210
2211 @property
2214
2215 @property
2217 """The connection on which this link was attached."""
2218 return self.session.connection
2219
2222
2223 @property
2226
2228 return pn_link_advance(self._impl)
2229
2230 @property
2232 return pn_link_unsettled(self._impl)
2233
2234 @property
2236 """The amount of outstanding credit on this link."""
2237 return pn_link_credit(self._impl)
2238
2239 @property
2241 return pn_link_available(self._impl)
2242
2243 @property
2245 return pn_link_queued(self._impl)
2246
2247 - def next(self, mask):
2248 return Link.wrap(pn_link_next(self._impl, mask))
2249
2250 @property
2252 """Returns the name of the link"""
2253 return utf82unicode(pn_link_name(self._impl))
2254
2255 @property
2257 """Returns true if this link is a sender."""
2258 return pn_link_is_sender(self._impl)
2259
2260 @property
2262 """Returns true if this link is a receiver."""
2263 return pn_link_is_receiver(self._impl)
2264
2265 @property
2267 return pn_link_remote_snd_settle_mode(self._impl)
2268
2269 @property
2271 return pn_link_remote_rcv_settle_mode(self._impl)
2272
2274 return pn_link_snd_settle_mode(self._impl)
2276 pn_link_set_snd_settle_mode(self._impl, mode)
2277 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2278
2280 return pn_link_rcv_settle_mode(self._impl)
2282 pn_link_set_rcv_settle_mode(self._impl, mode)
2283 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2284
2286 return pn_link_get_drain(self._impl)
2287
2289 pn_link_set_drain(self._impl, bool(b))
2290
2291 drain_mode = property(_get_drain, _set_drain)
2292
2294 return pn_link_drained(self._impl)
2295
2296 @property
2298 return pn_link_remote_max_message_size(self._impl)
2299
2301 return pn_link_max_message_size(self._impl)
2303 pn_link_set_max_message_size(self._impl, mode)
2304 max_message_size = property(_get_max_message_size, _set_max_message_size)
2305
2307 return pn_link_detach(self._impl)
2308
2310 pn_link_free(self._impl)
2311
2313
2314 UNSPECIFIED = PN_UNSPECIFIED
2315 SOURCE = PN_SOURCE
2316 TARGET = PN_TARGET
2317 COORDINATOR = PN_COORDINATOR
2318
2319 NONDURABLE = PN_NONDURABLE
2320 CONFIGURATION = PN_CONFIGURATION
2321 DELIVERIES = PN_DELIVERIES
2322
2323 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2324 DIST_MODE_COPY = PN_DIST_MODE_COPY
2325 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2326
2327 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
2328 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
2329 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
2330 EXPIRE_NEVER = PN_EXPIRE_NEVER
2331
2334
2336 if err < 0:
2337 exc = EXCEPTIONS.get(err, LinkException)
2338 raise exc("[%s]" % err)
2339 else:
2340 return err
2341
2343 return pn_terminus_get_type(self._impl)
2345 self._check(pn_terminus_set_type(self._impl, type))
2346 type = property(_get_type, _set_type)
2347
2349 """The address that identifies the source or target node"""
2350 return utf82unicode(pn_terminus_get_address(self._impl))
2352 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2353 address = property(_get_address, _set_address)
2354
2356 return pn_terminus_get_durability(self._impl)
2358 self._check(pn_terminus_set_durability(self._impl, seconds))
2359 durability = property(_get_durability, _set_durability)
2360
2362 return pn_terminus_get_expiry_policy(self._impl)
2364 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2365 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2366
2368 return pn_terminus_get_timeout(self._impl)
2370 self._check(pn_terminus_set_timeout(self._impl, seconds))
2371 timeout = property(_get_timeout, _set_timeout)
2372
2374 """Indicates whether the source or target node was dynamically
2375 created"""
2376 return pn_terminus_is_dynamic(self._impl)
2378 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2379 dynamic = property(_is_dynamic, _set_dynamic)
2380
2382 return pn_terminus_get_distribution_mode(self._impl)
2384 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2385 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2386
2387 @property
2389 """Properties of a dynamic source or target."""
2390 return Data(pn_terminus_properties(self._impl))
2391
2392 @property
2394 """Capabilities of the source or target."""
2395 return Data(pn_terminus_capabilities(self._impl))
2396
2397 @property
2399 return Data(pn_terminus_outcomes(self._impl))
2400
2401 @property
2403 """A filter on a source allows the set of messages transfered over
2404 the link to be restricted"""
2405 return Data(pn_terminus_filter(self._impl))
2406
2407 - def copy(self, src):
2408 self._check(pn_terminus_copy(self._impl, src._impl))
2409
2411 """
2412 A link over which messages are sent.
2413 """
2414
2416 pn_link_offered(self._impl, n)
2417
2419 """
2420 Send specified data as part of the current delivery
2421
2422 @type data: binary
2423 @param data: data to send
2424 """
2425 return self._check(pn_link_send(self._impl, data))
2426
2427 - def send(self, obj, tag=None):
2428 """
2429 Send specified object over this sender; the object is expected to
2430 have a send() method on it that takes the sender and an optional
2431 tag as arguments.
2432
2433 Where the object is a Message, this will send the message over
2434 this link, creating a new delivery for the purpose.
2435 """
2436 if hasattr(obj, 'send'):
2437 return obj.send(self, tag=tag)
2438 else:
2439
2440 return self.stream(obj)
2441
2443 if not hasattr(self, 'tag_generator'):
2444 def simple_tags():
2445 count = 1
2446 while True:
2447 yield str(count)
2448 count += 1
2449 self.tag_generator = simple_tags()
2450 return next(self.tag_generator)
2451
2453 """
2454 A link over which messages are received.
2455 """
2456
2457 - def flow(self, n):
2458 """Increases the credit issued to the remote sender by the specified number of messages."""
2459 pn_link_flow(self._impl, n)
2460
2461 - def recv(self, limit):
2462 n, binary = pn_link_recv(self._impl, limit)
2463 if n == PN_EOS:
2464 return None
2465 else:
2466 self._check(n)
2467 return binary
2468
2470 pn_link_drain(self._impl, n)
2471
2473 return pn_link_draining(self._impl)
2474
2476
2477 values = {}
2478
2480 ni = super(NamedInt, cls).__new__(cls, i)
2481 cls.values[i] = ni
2482 return ni
2483
2486
2489
2492
2493 @classmethod
2495 return cls.values.get(i, i)
2496
2499
2501
2502 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
2503 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
2504 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
2505 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
2506 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
2507
2509 self._impl = impl
2510 self.local = local
2511 self._data = None
2512 self._condition = None
2513 self._annotations = None
2514
2515 @property
2517 return DispositionType.get(pn_disposition_type(self._impl))
2518
2520 return pn_disposition_get_section_number(self._impl)
2522 pn_disposition_set_section_number(self._impl, n)
2523 section_number = property(_get_section_number, _set_section_number)
2524
2526 return pn_disposition_get_section_offset(self._impl)
2528 pn_disposition_set_section_offset(self._impl, n)
2529 section_offset = property(_get_section_offset, _set_section_offset)
2530
2532 return pn_disposition_is_failed(self._impl)
2534 pn_disposition_set_failed(self._impl, b)
2535 failed = property(_get_failed, _set_failed)
2536
2538 return pn_disposition_is_undeliverable(self._impl)
2540 pn_disposition_set_undeliverable(self._impl, b)
2541 undeliverable = property(_get_undeliverable, _set_undeliverable)
2542
2544 if self.local:
2545 return self._data
2546 else:
2547 return dat2obj(pn_disposition_data(self._impl))
2549 if self.local:
2550 self._data = obj
2551 else:
2552 raise AttributeError("data attribute is read-only")
2553 data = property(_get_data, _set_data)
2554
2556 if self.local:
2557 return self._annotations
2558 else:
2559 return dat2obj(pn_disposition_annotations(self._impl))
2561 if self.local:
2562 self._annotations = obj
2563 else:
2564 raise AttributeError("annotations attribute is read-only")
2565 annotations = property(_get_annotations, _set_annotations)
2566
2568 if self.local:
2569 return self._condition
2570 else:
2571 return cond2obj(pn_disposition_condition(self._impl))
2573 if self.local:
2574 self._condition = obj
2575 else:
2576 raise AttributeError("condition attribute is read-only")
2577 condition = property(_get_condition, _set_condition)
2578
2580 """
2581 Tracks and/or records the delivery of a message over a link.
2582 """
2583
2584 RECEIVED = Disposition.RECEIVED
2585 ACCEPTED = Disposition.ACCEPTED
2586 REJECTED = Disposition.REJECTED
2587 RELEASED = Disposition.RELEASED
2588 MODIFIED = Disposition.MODIFIED
2589
2590 @staticmethod
2592 if impl is None:
2593 return None
2594 else:
2595 return Delivery(impl)
2596
2599
2601 self.local = Disposition(pn_delivery_local(self._impl), True)
2602 self.remote = Disposition(pn_delivery_remote(self._impl), False)
2603
2604 @property
2606 """The identifier for the delivery."""
2607 return pn_delivery_tag(self._impl)
2608
2609 @property
2611 """Returns true for an outgoing delivery to which data can now be written."""
2612 return pn_delivery_writable(self._impl)
2613
2614 @property
2616 """Returns true for an incoming delivery that has data to read."""
2617 return pn_delivery_readable(self._impl)
2618
2619 @property
2621 """Returns true if the state of the delivery has been updated
2622 (e.g. it has been settled and/or accepted, rejected etc)."""
2623 return pn_delivery_updated(self._impl)
2624
2626 """
2627 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
2628 """
2629 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
2630 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
2631 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
2632 pn_delivery_update(self._impl, state)
2633
2634 @property
2636 return pn_delivery_pending(self._impl)
2637
2638 @property
2640 """
2641 Returns true for an incoming delivery if not all the data is
2642 yet available.
2643 """
2644 return pn_delivery_partial(self._impl)
2645
2646 @property
2648 """Returns the local state of the delivery."""
2649 return DispositionType.get(pn_delivery_local_state(self._impl))
2650
2651 @property
2653 """
2654 Returns the state of the delivery as indicated by the remote
2655 peer.
2656 """
2657 return DispositionType.get(pn_delivery_remote_state(self._impl))
2658
2659 @property
2661 """
2662 Returns true if the delivery has been settled by the remote peer.
2663 """
2664 return pn_delivery_settled(self._impl)
2665
2667 """
2668 Settles the delivery locally. This indicates the application
2669 considers the delivery complete and does not wish to receive any
2670 further events about it. Every delivery should be settled locally.
2671 """
2672 pn_delivery_settle(self._impl)
2673
2674 @property
2676 """Returns true if the delivery has been aborted."""
2677 return pn_delivery_aborted(self._impl)
2678
2680 """
2681 Aborts the delivery. This indicates the application wishes to
2682 invalidate any data that may have already been sent on this delivery.
2683 The delivery cannot be aborted after it has been completely delivered.
2684 """
2685 pn_delivery_abort(self._impl)
2686
2687 @property
2690
2691 @property
2693 """
2694 Returns the link on which the delivery was sent or received.
2695 """
2696 return Link.wrap(pn_delivery_link(self._impl))
2697
2698 @property
2700 """
2701 Returns the session over which the delivery was sent or received.
2702 """
2703 return self.link.session
2704
2705 @property
2707 """
2708 Returns the connection over which the delivery was sent or received.
2709 """
2710 return self.session.connection
2711
2712 @property
2715
2718
2720
2723
2724 - def __call__(self, trans_impl, message):
2726
2728
2729 TRACE_OFF = PN_TRACE_OFF
2730 TRACE_DRV = PN_TRACE_DRV
2731 TRACE_FRM = PN_TRACE_FRM
2732 TRACE_RAW = PN_TRACE_RAW
2733
2734 CLIENT = 1
2735 SERVER = 2
2736
2737 @staticmethod
2739 if impl is None:
2740 return None
2741 else:
2742 return Transport(_impl=impl)
2743
2744 - def __init__(self, mode=None, _impl = pn_transport):
2752
2754 self._sasl = None
2755 self._ssl = None
2756
2758 if err < 0:
2759 exc = EXCEPTIONS.get(err, TransportException)
2760 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
2761 else:
2762 return err
2763
2765 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
2766
2768 adapter = pn_transport_get_pytracer(self._impl)
2769 if adapter:
2770 return adapter.tracer
2771 else:
2772 return None
2773
2774 tracer = property(_get_tracer, _set_tracer,
2775 doc="""
2776 A callback for trace logging. The callback is passed the transport and log message.
2777 """)
2778
2779 - def log(self, message):
2780 pn_transport_log(self._impl, message)
2781
2783 pn_transport_require_auth(self._impl, bool)
2784
2785 @property
2787 return pn_transport_is_authenticated(self._impl)
2788
2790 pn_transport_require_encryption(self._impl, bool)
2791
2792 @property
2794 return pn_transport_is_encrypted(self._impl)
2795
2796 @property
2798 return pn_transport_get_user(self._impl)
2799
2800 - def bind(self, connection):
2801 """Assign a connection to the transport"""
2802 self._check(pn_transport_bind(self._impl, connection._impl))
2803
2805 """Release the connection"""
2806 self._check(pn_transport_unbind(self._impl))
2807
2809 pn_transport_trace(self._impl, n)
2810
2811 - def tick(self, now):
2812 """Process any timed events (like heartbeat generation).
2813 now = seconds since epoch (float).
2814 """
2815 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
2816
2818 c = pn_transport_capacity(self._impl)
2819 if c >= PN_EOS:
2820 return c
2821 else:
2822 return self._check(c)
2823
2824 - def push(self, binary):
2825 n = self._check(pn_transport_push(self._impl, binary))
2826 if n != len(binary):
2827 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
2828
2830 self._check(pn_transport_close_tail(self._impl))
2831
2833 p = pn_transport_pending(self._impl)
2834 if p >= PN_EOS:
2835 return p
2836 else:
2837 return self._check(p)
2838
2839 - def peek(self, size):
2840 cd, out = pn_transport_peek(self._impl, size)
2841 if cd == PN_EOS:
2842 return None
2843 else:
2844 self._check(cd)
2845 return out
2846
2847 - def pop(self, size):
2848 pn_transport_pop(self._impl, size)
2849
2851 self._check(pn_transport_close_head(self._impl))
2852
2853 @property
2855 return pn_transport_closed(self._impl)
2856
2857
2859 return pn_transport_get_max_frame(self._impl)
2860
2862 pn_transport_set_max_frame(self._impl, value)
2863
2864 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
2865 doc="""
2866 Sets the maximum size for received frames (in bytes).
2867 """)
2868
2869 @property
2871 return pn_transport_get_remote_max_frame(self._impl)
2872
2874 return pn_transport_get_channel_max(self._impl)
2875
2877 if pn_transport_set_channel_max(self._impl, value):
2878 raise SessionException("Too late to change channel max.")
2879
2880 channel_max = property(_get_channel_max, _set_channel_max,
2881 doc="""
2882 Sets the maximum channel that may be used on the transport.
2883 """)
2884
2885 @property
2887 return pn_transport_remote_channel_max(self._impl)
2888
2889
2891 return millis2secs(pn_transport_get_idle_timeout(self._impl))
2892
2894 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
2895
2896 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
2897 doc="""
2898 The idle timeout of the connection (float, in seconds).
2899 """)
2900
2901 @property
2903 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
2904
2905 @property
2907 return pn_transport_get_frames_output(self._impl)
2908
2909 @property
2912
2915
2916 - def ssl(self, domain=None, session_details=None):
2917
2918 if not self._ssl:
2919 self._ssl = SSL(self, domain, session_details)
2920 return self._ssl
2921
2922 @property
2924 return cond2obj(pn_transport_condition(self._impl))
2925
2926 @property
2929
2932
2933 -class SASL(Wrapper):
2934
2935 OK = PN_SASL_OK
2936 AUTH = PN_SASL_AUTH
2937 SYS = PN_SASL_SYS
2938 PERM = PN_SASL_PERM
2939 TEMP = PN_SASL_TEMP
2940
2941 @staticmethod
2943 return pn_sasl_extended()
2944
2948
2950 if err < 0:
2951 exc = EXCEPTIONS.get(err, SASLException)
2952 raise exc("[%s]" % (err))
2953 else:
2954 return err
2955
2956 @property
2958 return pn_sasl_get_user(self._sasl)
2959
2960 @property
2962 return pn_sasl_get_mech(self._sasl)
2963
2964 @property
2966 outcome = pn_sasl_outcome(self._sasl)
2967 if outcome == PN_SASL_NONE:
2968 return None
2969 else:
2970 return outcome
2971
2973 pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
2974
2976 return pn_sasl_get_allow_insecure_mechs(self._sasl)
2977
2979 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
2980
2981 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
2982 doc="""
2983 Allow unencrypted cleartext passwords (PLAIN mech)
2984 """)
2985
2986 - def done(self, outcome):
2987 pn_sasl_done(self._sasl, outcome)
2988
2990 pn_sasl_config_name(self._sasl, name)
2991
2993 pn_sasl_config_path(self._sasl, path)
2994
2997
3000
3001 -class SSLDomain(object):
3002
3003 MODE_CLIENT = PN_SSL_MODE_CLIENT
3004 MODE_SERVER = PN_SSL_MODE_SERVER
3005 VERIFY_PEER = PN_SSL_VERIFY_PEER
3006 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3007 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3008
3009 - def __init__(self, mode):
3010 self._domain = pn_ssl_domain(mode)
3011 if self._domain is None:
3012 raise SSLUnavailable()
3013
3014 - def _check(self, err):
3015 if err < 0:
3016 exc = EXCEPTIONS.get(err, SSLException)
3017 raise exc("SSL failure.")
3018 else:
3019 return err
3020
3021 - def set_credentials(self, cert_file, key_file, password):
3022 return self._check( pn_ssl_domain_set_credentials(self._domain,
3023 cert_file, key_file,
3024 password) )
3025 - def set_trusted_ca_db(self, certificate_db):
3026 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3027 certificate_db) )
3028 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3029 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3030 verify_mode,
3031 trusted_CAs) )
3032
3034 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3035
3036 - def __del__(self):
3037 pn_ssl_domain_free(self._domain)
3038
3040
3041 @staticmethod
3043 return pn_ssl_present()
3044
3046 if err < 0:
3047 exc = EXCEPTIONS.get(err, SSLException)
3048 raise exc("SSL failure.")
3049 else:
3050 return err
3051
3052 - def __new__(cls, transport, domain, session_details=None):
3053 """Enforce a singleton SSL object per Transport"""
3054 if transport._ssl:
3055
3056
3057
3058 ssl = transport._ssl
3059 if (domain and (ssl._domain is not domain) or
3060 session_details and (ssl._session_details is not session_details)):
3061 raise SSLException("Cannot re-configure existing SSL object!")
3062 else:
3063 obj = super(SSL, cls).__new__(cls)
3064 obj._domain = domain
3065 obj._session_details = session_details
3066 session_id = None
3067 if session_details:
3068 session_id = session_details.get_session_id()
3069 obj._ssl = pn_ssl( transport._impl )
3070 if obj._ssl is None:
3071 raise SSLUnavailable()
3072 if domain:
3073 pn_ssl_init( obj._ssl, domain._domain, session_id )
3074 transport._ssl = obj
3075 return transport._ssl
3076
3078 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3079 if rc:
3080 return name
3081 return None
3082
3084 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3085 if rc:
3086 return name
3087 return None
3088
3089 SHA1 = PN_SSL_SHA1
3090 SHA256 = PN_SSL_SHA256
3091 SHA512 = PN_SSL_SHA512
3092 MD5 = PN_SSL_MD5
3093
3094 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
3095 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
3096 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
3097 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
3098 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
3099 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
3100
3102 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
3103 return subfield_value
3104
3106 subject = pn_ssl_get_remote_subject(self._ssl)
3107 return subject
3108
3112
3113
3116
3119
3122
3125
3128
3131
3133 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
3134 if rc == PN_OK:
3135 return fingerprint_str
3136 return None
3137
3138
3141
3144
3148
3152
3155
3156 @property
3158 return pn_ssl_get_remote_subject( self._ssl )
3159
3160 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3161 RESUME_NEW = PN_SSL_RESUME_NEW
3162 RESUME_REUSED = PN_SSL_RESUME_REUSED
3163
3165 return pn_ssl_resume_status( self._ssl )
3166
3168 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3170 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3171 self._check(err)
3172 return utf82unicode(name)
3173 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3174 doc="""
3175 Manage the expected name of the remote peer. Used to authenticate the remote.
3176 """)
3177
3180 """ Unique identifier for the SSL session. Used to resume previous session on a new
3181 SSL connection.
3182 """
3183
3185 self._session_id = session_id
3186
3188 return self._session_id
3189
3190
3191 wrappers = {
3192 "pn_void": lambda x: pn_void2py(x),
3193 "pn_pyref": lambda x: pn_void2py(x),
3194 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3195 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3196 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3197 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3198 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3199 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3200 }
3203
3205 self._impl = pn_collector()
3206
3207 - def put(self, obj, etype):
3208 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3209
3211 return Event.wrap(pn_collector_peek(self._impl))
3212
3214 ev = self.peek()
3215 pn_collector_pop(self._impl)
3216
3218 pn_collector_free(self._impl)
3219 del self._impl
3220
3221 if "TypeExtender" not in globals():
3224 self.number = number
3226 try:
3227 return self.number
3228 finally:
3229 self.number += 1
3230
3232
3233 _lock = threading.Lock()
3234 _extended = TypeExtender(10000)
3235 TYPES = {}
3236
3237 - def __init__(self, name=None, number=None, method=None):
3238 if name is None and number is None:
3239 raise TypeError("extended events require a name")
3240 try:
3241 self._lock.acquire()
3242 if name is None:
3243 name = pn_event_type_name(number)
3244
3245 if number is None:
3246 number = self._extended.next()
3247
3248 if method is None:
3249 method = "on_%s" % name
3250
3251 self.name = name
3252 self.number = number
3253 self.method = method
3254
3255 self.TYPES[number] = self
3256 finally:
3257 self._lock.release()
3258
3261
3268
3270
3271 - def __init__(self, clazz, context, type):
3275
3278
3279 -def _none(x): return None
3280
3281 DELEGATED = Constant("DELEGATED")
3282
3283 -def _core(number, method):
3284 return EventType(number=number, method=method)
3285
3286 -class Event(Wrapper, EventBase):
3287
3288 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3289 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3290 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3291
3292 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3293
3294 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3295 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3296 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3297 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3298 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3299 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3300 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3301 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3302
3303 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3304 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3305 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3306 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3307 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3308 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3309
3310 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3311 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3312 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3313 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3314 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3315 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3316 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3317 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3318 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3319
3320 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3321
3322 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3323 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3324 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3325 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3326 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3327
3328 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3329 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3330 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3331 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3332 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3333 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3334 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3335
3336 @staticmethod
3337 - def wrap(impl, number=None):
3338 if impl is None:
3339 return None
3340
3341 if number is None:
3342 number = pn_event_type(impl)
3343
3344 event = Event(impl, number)
3345
3346
3347
3348 if pn_event_class(impl) == PN_PYREF and \
3349 isinstance(event.context, EventBase):
3350 return event.context
3351 else:
3352 return event
3353
3357
3360
3364
3365 @property
3367 cls = pn_event_class(self._impl)
3368 if cls:
3369 return pn_class_name(cls)
3370 else:
3371 return None
3372
3373 @property
3375 return WrappedHandler.wrap(pn_event_root(self._impl))
3376
3377 @property
3378 - def context(self):
3379 """Returns the context object associated with the event. The type of this depend on the type of event."""
3380 return wrappers[self.clazz](pn_event_context(self._impl))
3381
3382 - def dispatch(self, handler, type=None):
3391
3392
3393 @property
3395 """Returns the reactor associated with the event."""
3396 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3397
3399 r = self.reactor
3400 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
3401 return r
3402 else:
3403 return super(Event, self).__getattr__(name)
3404
3405 @property
3407 """Returns the transport associated with the event, or null if none is associated with it."""
3408 return Transport.wrap(pn_event_transport(self._impl))
3409
3410 @property
3412 """Returns the connection associated with the event, or null if none is associated with it."""
3413 return Connection.wrap(pn_event_connection(self._impl))
3414
3415 @property
3417 """Returns the session associated with the event, or null if none is associated with it."""
3418 return Session.wrap(pn_event_session(self._impl))
3419
3420 @property
3422 """Returns the link associated with the event, or null if none is associated with it."""
3423 return Link.wrap(pn_event_link(self._impl))
3424
3425 @property
3427 """Returns the sender link associated with the event, or null if
3428 none is associated with it. This is essentially an alias for
3429 link(), that does an additional checkon the type of the
3430 link."""
3431 l = self.link
3432 if l and l.is_sender:
3433 return l
3434 else:
3435 return None
3436
3437 @property
3439 """Returns the receiver link associated with the event, or null if
3440 none is associated with it. This is essentially an alias for
3441 link(), that does an additional checkon the type of the link."""
3442 l = self.link
3443 if l and l.is_receiver:
3444 return l
3445 else:
3446 return None
3447
3448 @property
3450 """Returns the delivery associated with the event, or null if none is associated with it."""
3451 return Delivery.wrap(pn_event_delivery(self._impl))
3452
3455
3458 if obj is None:
3459 return self
3460 ret = []
3461 obj.__dict__['handlers'] = ret
3462 return ret
3463
3469
3471
3472 - def __init__(self, handler, on_error=None):
3475
3479
3485
3488 self.handlers = []
3489 self.delegate = weakref.ref(delegate)
3490
3492 delegate = self.delegate()
3493 if delegate:
3494 dispatch(delegate, method, event)
3495
3499 if obj is None:
3500 return None
3501 return self.surrogate(obj).handlers
3502
3504 self.surrogate(obj).handlers = value
3505
3507 key = "_surrogate"
3508 objdict = obj.__dict__
3509 surrogate = objdict.get(key, None)
3510 if surrogate is None:
3511 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
3512 obj.add(surrogate)
3513 return surrogate
3514
3516
3517 handlers = WrappedHandlersProperty()
3518
3519 @classmethod
3520 - def wrap(cls, impl, on_error=None):
3527
3528 - def __init__(self, impl_or_constructor):
3529 Wrapper.__init__(self, impl_or_constructor)
3530 if list(self.__class__.__mro__).index(WrappedHandler) > 1:
3531
3532 self.handlers.extend([])
3533
3540
3541 - def add(self, handler, on_error=None):
3547
3549 pn_handler_clear(self._impl)
3550
3552 if obj is None:
3553 return None
3554 elif isinstance(obj, WrappedHandler):
3555 impl = obj._impl
3556 pn_incref(impl)
3557 return impl
3558 else:
3559 return pn_pyhandler(_cadapter(obj, on_error))
3560
3562 """
3563 Simple URL parser/constructor, handles URLs of the form:
3564
3565 <scheme>://<user>:<password>@<host>:<port>/<path>
3566
3567 All components can be None if not specified in the URL string.
3568
3569 The port can be specified as a service name, e.g. 'amqp' in the
3570 URL string but Url.port always gives the integer value.
3571
3572 Warning: The placement of user and password in URLs is not
3573 recommended. It can result in credentials leaking out in program
3574 logs. Use connection configuration attributes instead.
3575
3576 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
3577 @ivar user: Username
3578 @ivar password: Password
3579 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
3580 @ivar port: Integer port.
3581 @ivar host_port: Returns host:port
3582 """
3583
3584 AMQPS = "amqps"
3585 AMQP = "amqp"
3586
3588 """An integer port number that can be constructed from a service name string"""
3589
3591 """@param value: integer port number or string service name."""
3592 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
3593 setattr(port, 'name', str(value))
3594 return port
3595
3596 - def __eq__(self, x): return str(self) == x or int(self) == x
3597 - def __ne__(self, x): return not self == x
3599
3600 @staticmethod
3602 """Convert service, an integer or a service name, into an integer port number."""
3603 try:
3604 return int(value)
3605 except ValueError:
3606 try:
3607 return socket.getservbyname(value)
3608 except socket.error:
3609
3610 if value == Url.AMQPS: return 5671
3611 elif value == Url.AMQP: return 5672
3612 else:
3613 raise ValueError("Not a valid port number or service name: '%s'" % value)
3614
3615 - def __init__(self, url=None, defaults=True, **kwargs):
3616 """
3617 @param url: URL string to parse.
3618 @param defaults: If true, fill in missing default values in the URL.
3619 If false, you can fill them in later by calling self.defaults()
3620 @param kwargs: scheme, user, password, host, port, path.
3621 If specified, replaces corresponding part in url string.
3622 """
3623 if url:
3624 self._url = pn_url_parse(unicode2utf8(str(url)))
3625 if not self._url: raise ValueError("Invalid URL '%s'" % url)
3626 else:
3627 self._url = pn_url()
3628 for k in kwargs:
3629 getattr(self, k)
3630 setattr(self, k, kwargs[k])
3631 if defaults: self.defaults()
3632
3635 self.getter = globals()["pn_url_get_%s" % part]
3636 self.setter = globals()["pn_url_set_%s" % part]
3637 - def __get__(self, obj, type=None): return self.getter(obj._url)
3638 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3639
3640 scheme = PartDescriptor('scheme')
3641 username = PartDescriptor('username')
3642 password = PartDescriptor('password')
3643 host = PartDescriptor('host')
3644 path = PartDescriptor('path')
3645
3647 portstr = pn_url_get_port(self._url)
3648 return portstr and Url.Port(portstr)
3649
3651 if value is None: pn_url_set_port(self._url, None)
3652 else: pn_url_set_port(self._url, str(Url.Port(value)))
3653
3654 port = property(_get_port, _set_port)
3655
3656 - def __str__(self): return pn_url_str(self._url)
3657
3660
3661 - def __eq__(self, x): return str(self) == str(x)
3662 - def __ne__(self, x): return not self == x
3663
3665 pn_url_free(self._url);
3666 del self._url
3667
3669 """
3670 Fill in missing values (scheme, host or port) with defaults
3671 @return: self
3672 """
3673 self.scheme = self.scheme or self.AMQP
3674 self.host = self.host or '0.0.0.0'
3675 self.port = self.port or self.Port(self.scheme)
3676 return self
3677
3678 __all__ = [
3679 "API_LANGUAGE",
3680 "IMPLEMENTATION_LANGUAGE",
3681 "ABORTED",
3682 "ACCEPTED",
3683 "PENDING",
3684 "REJECTED",
3685 "RELEASED",
3686 "MODIFIED",
3687 "SETTLED",
3688 "UNDESCRIBED",
3689 "Array",
3690 "Collector",
3691 "Condition",
3692 "Connection",
3693 "Data",
3694 "Delivery",
3695 "Disposition",
3696 "Described",
3697 "Endpoint",
3698 "Event",
3699 "EventType",
3700 "Handler",
3701 "Link",
3702 "Message",
3703 "MessageException",
3704 "ProtonException",
3705 "VERSION_MAJOR",
3706 "VERSION_MINOR",
3707 "Receiver",
3708 "SASL",
3709 "Sender",
3710 "Session",
3711 "SessionException",
3712 "SSL",
3713 "SSLDomain",
3714 "SSLSessionDetails",
3715 "SSLUnavailable",
3716 "SSLException",
3717 "Terminus",
3718 "Timeout",
3719 "Interrupt",
3720 "Transport",
3721 "TransportException",
3722 "Url",
3723 "char",
3724 "dispatch",
3725 "symbol",
3726 "timestamp",
3727 "ulong",
3728 "byte",
3729 "short",
3730 "int32",
3731 "ubyte",
3732 "ushort",
3733 "uint",
3734 "float32",
3735 "decimal32",
3736 "decimal64",
3737 "decimal128"
3738 ]
3739