Package proton :: Module _endpoints
[frames] | no frames]

Source Code for Module proton._endpoints

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  """ 
 21  The proton.endpoints module 
 22  """ 
 23   
 24  from __future__ import absolute_import 
 25   
 26  import weakref 
 27   
 28  from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \ 
 29      PN_REMOTE_CLOSED, \ 
 30      pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \ 
 31      pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \ 
 32      pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \ 
 33      pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \ 
 34      pn_connection_set_password, pn_connection_remote_container, pn_connection_remote_hostname, \ 
 35      pn_connection_remote_offered_capabilities, pn_connection_remote_desired_capabilities, \ 
 36      pn_connection_remote_properties, pn_connection_offered_capabilities, pn_connection_desired_capabilities, \ 
 37      pn_connection_properties, pn_connection_open, pn_connection_close, pn_connection_state, pn_connection_release, \ 
 38      pn_session, pn_session_head, pn_session_attachments, pn_session_condition, pn_session_remote_condition, \ 
 39      pn_session_get_incoming_capacity, pn_session_set_incoming_capacity, pn_session_get_outgoing_window, \ 
 40      pn_session_set_outgoing_window, pn_session_incoming_bytes, pn_session_outgoing_bytes, pn_session_open, \ 
 41      pn_session_close, pn_session_next, pn_session_state, pn_session_connection, pn_session_free, \ 
 42      PN_SND_UNSETTLED, PN_SND_SETTLED, PN_SND_MIXED, PN_RCV_FIRST, PN_RCV_SECOND, \ 
 43      pn_link_head, pn_link_is_sender, pn_link_attachments, pn_link_error, pn_link_condition, pn_link_remote_condition, \ 
 44      pn_link_open, pn_link_close, pn_link_state, pn_link_source, pn_link_target, pn_link_remote_source, \ 
 45      pn_link_remote_target, pn_link_session, pn_link_current, pn_link_advance, pn_link_unsettled, pn_link_credit, \ 
 46      pn_link_available, pn_link_queued, pn_link_next, pn_link_name, pn_link_is_receiver, pn_link_remote_snd_settle_mode, \ 
 47      pn_link_remote_rcv_settle_mode, pn_link_snd_settle_mode, pn_link_set_snd_settle_mode, pn_link_rcv_settle_mode, \ 
 48      pn_link_set_rcv_settle_mode, pn_link_get_drain, pn_link_set_drain, pn_link_drained, pn_link_remote_max_message_size, \ 
 49      pn_link_max_message_size, pn_link_set_max_message_size, pn_link_detach, pn_link_free, pn_link_offered, pn_link_send, \ 
 50      pn_link_flow, pn_link_recv, pn_link_drain, pn_link_draining, \ 
 51      pn_sender, pn_receiver, \ 
 52      PN_UNSPECIFIED, PN_SOURCE, PN_TARGET, PN_COORDINATOR, PN_NONDURABLE, PN_CONFIGURATION, \ 
 53      PN_DELIVERIES, PN_DIST_MODE_UNSPECIFIED, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, PN_EXPIRE_WITH_LINK, \ 
 54      PN_EXPIRE_WITH_SESSION, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_NEVER, \ 
 55      pn_terminus_set_durability, pn_terminus_set_timeout, pn_terminus_set_dynamic, pn_terminus_get_type, \ 
 56      pn_terminus_get_durability, pn_terminus_set_type, pn_terminus_get_address, pn_terminus_capabilities, \ 
 57      pn_terminus_set_address, pn_terminus_get_timeout, pn_terminus_filter, pn_terminus_properties, \ 
 58      pn_terminus_get_expiry_policy, pn_terminus_set_expiry_policy, pn_terminus_set_distribution_mode, \ 
 59      pn_terminus_get_distribution_mode, pn_terminus_copy, pn_terminus_outcomes, pn_terminus_is_dynamic, \ 
 60      PN_EOS, \ 
 61      pn_delivery, \ 
 62      pn_work_head, \ 
 63      pn_error_code, pn_error_text 
 64   
 65  from ._common import utf82unicode, unicode2utf8 
 66  from ._condition import obj2cond, cond2obj 
 67  from ._data import Data, obj2dat, dat2obj 
 68  from ._delivery import Delivery 
 69  from ._exceptions import EXCEPTIONS, LinkException, SessionException, ConnectionException 
 70  from ._transport import Transport 
 71  from ._wrapper import Wrapper 
72 73 74 -class Endpoint(object):
75 LOCAL_UNINIT = PN_LOCAL_UNINIT 76 REMOTE_UNINIT = PN_REMOTE_UNINIT 77 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 78 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 79 LOCAL_CLOSED = PN_LOCAL_CLOSED 80 REMOTE_CLOSED = PN_REMOTE_CLOSED 81
82 - def _init(self):
83 self.condition = None
84
85 - def _update_cond(self):
86 obj2cond(self.condition, self._get_cond_impl())
87 88 @property
89 - def remote_condition(self):
90 return cond2obj(self._get_remote_cond_impl())
91 92 # the following must be provided by subclasses
93 - def _get_cond_impl(self):
94 assert False, "Subclass must override this!"
95
96 - def _get_remote_cond_impl(self):
97 assert False, "Subclass must override this!"
98
99 - def _get_handler(self):
100 from . import reactor 101 from . import _reactor_impl 102 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 103 if ractor: 104 on_error = ractor.on_error_delegate() 105 else: 106 on_error = None 107 record = self._get_attachments() 108 return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
109
110 - def _set_handler(self, handler):
111 from . import reactor 112 from . import _reactor_impl 113 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 114 if ractor: 115 on_error = ractor.on_error_delegate() 116 else: 117 on_error = None 118 impl = _reactor_impl._chandler(handler, on_error) 119 record = self._get_attachments() 120 pn_record_set_handler(record, impl) 121 pn_decref(impl)
122 123 handler = property(_get_handler, _set_handler) 124 125 @property
126 - def transport(self):
127 return self.connection.transport
128
129 130 -class Connection(Wrapper, Endpoint):
131 """ 132 A representation of an AMQP connection 133 """ 134 135 @staticmethod
136 - def wrap(impl):
137 if impl is None: 138 return None 139 else: 140 return Connection(impl)
141
142 - def __init__(self, impl=pn_connection):
143 Wrapper.__init__(self, impl, pn_connection_attachments)
144
145 - def _init(self):
146 Endpoint._init(self) 147 self.offered_capabilities = None 148 self.desired_capabilities = None 149 self.properties = None
150
151 - def _get_attachments(self):
152 return pn_connection_attachments(self._impl)
153 154 @property
155 - def connection(self):
156 return self
157 158 @property
159 - def transport(self):
160 return Transport.wrap(pn_connection_transport(self._impl))
161
162 - def _check(self, err):
163 if err < 0: 164 exc = EXCEPTIONS.get(err, ConnectionException) 165 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 166 else: 167 return err
168
169 - def _get_cond_impl(self):
170 return pn_connection_condition(self._impl)
171
172 - def _get_remote_cond_impl(self):
173 return pn_connection_remote_condition(self._impl)
174
175 - def collect(self, collector):
176 if collector is None: 177 pn_connection_collect(self._impl, None) 178 else: 179 pn_connection_collect(self._impl, collector._impl) 180 self._collector = weakref.ref(collector)
181
182 - def _get_container(self):
183 return utf82unicode(pn_connection_get_container(self._impl))
184
185 - def _set_container(self, name):
186 return pn_connection_set_container(self._impl, unicode2utf8(name))
187 188 container = property(_get_container, _set_container) 189
190 - def _get_hostname(self):
191 return utf82unicode(pn_connection_get_hostname(self._impl))
192
193 - def _set_hostname(self, name):
194 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
195 196 hostname = property(_get_hostname, _set_hostname, 197 doc=""" 198 Set the name of the host (either fully qualified or relative) to which this 199 connection is connecting to. This information may be used by the remote 200 peer to determine the correct back-end service to connect the client to. 201 This value will be sent in the Open performative, and will be used by SSL 202 and SASL layers to identify the peer. 203 """) 204
205 - def _get_user(self):
206 return utf82unicode(pn_connection_get_user(self._impl))
207
208 - def _set_user(self, name):
209 return pn_connection_set_user(self._impl, unicode2utf8(name))
210 211 user = property(_get_user, _set_user) 212
213 - def _get_password(self):
214 return None
215
216 - def _set_password(self, name):
217 return pn_connection_set_password(self._impl, unicode2utf8(name))
218 219 password = property(_get_password, _set_password) 220 221 @property
222 - def remote_container(self):
223 """The container identifier specified by the remote peer for this connection.""" 224 return pn_connection_remote_container(self._impl)
225 226 @property
227 - def remote_hostname(self):
228 """The hostname specified by the remote peer for this connection.""" 229 return pn_connection_remote_hostname(self._impl)
230 231 @property
233 """The capabilities offered by the remote peer for this connection.""" 234 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
235 236 @property
238 """The capabilities desired by the remote peer for this connection.""" 239 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
240 241 @property
242 - def remote_properties(self):
243 """The properties specified by the remote peer for this connection.""" 244 return dat2obj(pn_connection_remote_properties(self._impl))
245
246 - def open(self):
247 """ 248 Opens the connection. 249 250 In more detail, this moves the local state of the connection to 251 the ACTIVE state and triggers an open frame to be sent to the 252 peer. A connection is fully active once both peers have opened it. 253 """ 254 obj2dat(self.offered_capabilities, 255 pn_connection_offered_capabilities(self._impl)) 256 obj2dat(self.desired_capabilities, 257 pn_connection_desired_capabilities(self._impl)) 258 obj2dat(self.properties, pn_connection_properties(self._impl)) 259 pn_connection_open(self._impl)
260
261 - def close(self):
262 """ 263 Closes the connection. 264 265 In more detail, this moves the local state of the connection to 266 the CLOSED state and triggers a close frame to be sent to the 267 peer. A connection is fully closed once both peers have closed it. 268 """ 269 self._update_cond() 270 pn_connection_close(self._impl) 271 if hasattr(self, '_session_policy'): 272 # break circular ref 273 del self._session_policy
274 275 @property
276 - def state(self):
277 """ 278 The state of the connection as a bit field. The state has a local 279 and a remote component. Each of these can be in one of three 280 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 281 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 282 REMOTE_ACTIVE and REMOTE_CLOSED. 283 """ 284 return pn_connection_state(self._impl)
285
286 - def session(self):
287 """ 288 Returns a new session on this connection. 289 """ 290 ssn = pn_session(self._impl) 291 if ssn is None: 292 raise (SessionException("Session allocation failed.")) 293 else: 294 return Session(ssn)
295
296 - def session_head(self, mask):
297 return Session.wrap(pn_session_head(self._impl, mask))
298 301 302 @property
303 - def work_head(self):
304 return Delivery.wrap(pn_work_head(self._impl))
305 306 @property
307 - def error(self):
308 return pn_error_code(pn_connection_error(self._impl))
309
310 - def free(self):
311 pn_connection_release(self._impl)
312
313 314 -class Session(Wrapper, Endpoint):
315 316 @staticmethod
317 - def wrap(impl):
318 if impl is None: 319 return None 320 else: 321 return Session(impl)
322
323 - def __init__(self, impl):
324 Wrapper.__init__(self, impl, pn_session_attachments)
325
326 - def _get_attachments(self):
327 return pn_session_attachments(self._impl)
328
329 - def _get_cond_impl(self):
330 return pn_session_condition(self._impl)
331
332 - def _get_remote_cond_impl(self):
333 return pn_session_remote_condition(self._impl)
334
335 - def _get_incoming_capacity(self):
336 return pn_session_get_incoming_capacity(self._impl)
337
338 - def _set_incoming_capacity(self, capacity):
339 pn_session_set_incoming_capacity(self._impl, capacity)
340 341 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 342
343 - def _get_outgoing_window(self):
344 return pn_session_get_outgoing_window(self._impl)
345
346 - def _set_outgoing_window(self, window):
347 pn_session_set_outgoing_window(self._impl, window)
348 349 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 350 351 @property
352 - def outgoing_bytes(self):
353 return pn_session_outgoing_bytes(self._impl)
354 355 @property
356 - def incoming_bytes(self):
357 return pn_session_incoming_bytes(self._impl)
358
359 - def open(self):
360 pn_session_open(self._impl)
361
362 - def close(self):
363 self._update_cond() 364 pn_session_close(self._impl)
365
366 - def next(self, mask):
367 return Session.wrap(pn_session_next(self._impl, mask))
368 369 @property
370 - def state(self):
371 return pn_session_state(self._impl)
372 373 @property
374 - def connection(self):
375 return Connection.wrap(pn_session_connection(self._impl))
376
377 - def sender(self, name):
378 return Sender(pn_sender(self._impl, unicode2utf8(name)))
379
380 - def receiver(self, name):
381 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
382
383 - def free(self):
384 pn_session_free(self._impl)
385 586
587 588 -class Sender(Link):
589 """ 590 A link over which messages are sent. 591 """ 592
593 - def offered(self, n):
594 pn_link_offered(self._impl, n)
595
596 - def stream(self, data):
597 """ 598 Send specified data as part of the current delivery 599 600 @type data: binary 601 @param data: data to send 602 """ 603 return self._check(pn_link_send(self._impl, data))
604
605 - def send(self, obj, tag=None):
606 """ 607 Send specified object over this sender; the object is expected to 608 have a send() method on it that takes the sender and an optional 609 tag as arguments. 610 611 Where the object is a Message, this will send the message over 612 this link, creating a new delivery for the purpose. 613 """ 614 if hasattr(obj, 'send'): 615 return obj.send(self, tag=tag) 616 else: 617 # treat object as bytes 618 return self.stream(obj)
619
620 - def delivery_tag(self):
621 if not hasattr(self, 'tag_generator'): 622 def simple_tags(): 623 count = 1 624 while True: 625 yield str(count) 626 count += 1
627 628 self.tag_generator = simple_tags() 629 return next(self.tag_generator)
630
631 632 -class Receiver(Link):
633 """ 634 A link over which messages are received. 635 """ 636
637 - def flow(self, n):
638 """Increases the credit issued to the remote sender by the specified number of messages.""" 639 pn_link_flow(self._impl, n)
640
641 - def recv(self, limit):
642 n, binary = pn_link_recv(self._impl, limit) 643 if n == PN_EOS: 644 return None 645 else: 646 self._check(n) 647 return binary
648
649 - def drain(self, n):
650 pn_link_drain(self._impl, n)
651
652 - def draining(self):
653 return pn_link_draining(self._impl)
654
655 656 -class Terminus(object):
657 UNSPECIFIED = PN_UNSPECIFIED 658 SOURCE = PN_SOURCE 659 TARGET = PN_TARGET 660 COORDINATOR = PN_COORDINATOR 661 662 NONDURABLE = PN_NONDURABLE 663 CONFIGURATION = PN_CONFIGURATION 664 DELIVERIES = PN_DELIVERIES 665 666 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 667 DIST_MODE_COPY = PN_DIST_MODE_COPY 668 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 669 670 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 671 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 672 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 673 EXPIRE_NEVER = PN_EXPIRE_NEVER 674
675 - def __init__(self, impl):
676 self._impl = impl
677
678 - def _check(self, err):
679 if err < 0: 680 exc = EXCEPTIONS.get(err, LinkException) 681 raise exc("[%s]" % err) 682 else: 683 return err
684
685 - def _get_type(self):
686 return pn_terminus_get_type(self._impl)
687
688 - def _set_type(self, type):
689 self._check(pn_terminus_set_type(self._impl, type))
690 691 type = property(_get_type, _set_type) 692
693 - def _get_address(self):
694 """The address that identifies the source or target node""" 695 return utf82unicode(pn_terminus_get_address(self._impl))
696
697 - def _set_address(self, address):
698 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
699 700 address = property(_get_address, _set_address) 701
702 - def _get_durability(self):
703 return pn_terminus_get_durability(self._impl)
704
705 - def _set_durability(self, seconds):
706 self._check(pn_terminus_set_durability(self._impl, seconds))
707 708 durability = property(_get_durability, _set_durability) 709
710 - def _get_expiry_policy(self):
711 return pn_terminus_get_expiry_policy(self._impl)
712
713 - def _set_expiry_policy(self, seconds):
714 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
715 716 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 717
718 - def _get_timeout(self):
719 return pn_terminus_get_timeout(self._impl)
720
721 - def _set_timeout(self, seconds):
722 self._check(pn_terminus_set_timeout(self._impl, seconds))
723 724 timeout = property(_get_timeout, _set_timeout) 725
726 - def _is_dynamic(self):
727 """Indicates whether the source or target node was dynamically 728 created""" 729 return pn_terminus_is_dynamic(self._impl)
730
731 - def _set_dynamic(self, dynamic):
732 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
733 734 dynamic = property(_is_dynamic, _set_dynamic) 735
736 - def _get_distribution_mode(self):
737 return pn_terminus_get_distribution_mode(self._impl)
738
739 - def _set_distribution_mode(self, mode):
740 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
741 742 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 743 744 @property
745 - def properties(self):
746 """Properties of a dynamic source or target.""" 747 return Data(pn_terminus_properties(self._impl))
748 749 @property
750 - def capabilities(self):
751 """Capabilities of the source or target.""" 752 return Data(pn_terminus_capabilities(self._impl))
753 754 @property
755 - def outcomes(self):
756 return Data(pn_terminus_outcomes(self._impl))
757 758 @property
759 - def filter(self):
760 """A filter on a source allows the set of messages transfered over 761 the link to be restricted""" 762 return Data(pn_terminus_filter(self._impl))
763
764 - def copy(self, src):
765 self._check(pn_terminus_copy(self._impl, src._impl))
766