1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import logging
23 import time
24 import weakref
25 from select import select
26
27 from proton import Delivery, Endpoint
28 from proton import Message, Handler, ProtonException
29 from ._events import dispatch
30
31 log = logging.getLogger("proton")
35 """
36 A utility for simpler and more intuitive handling of delivery
37 events related to outgoing i.e. sent messages.
38 """
39
40 - def __init__(self, auto_settle=True, delegate=None):
41 self.auto_settle = auto_settle
42 self.delegate = delegate
43
49
63
65 """
66 Called when the sender link has credit and messages can
67 therefore be transferred.
68 """
69 if self.delegate is not None:
70 dispatch(self.delegate, 'on_sendable', event)
71
73 """
74 Called when the remote peer accepts an outgoing message.
75 """
76 if self.delegate is not None:
77 dispatch(self.delegate, 'on_accepted', event)
78
80 """
81 Called when the remote peer rejects an outgoing message.
82 """
83 if self.delegate is not None:
84 dispatch(self.delegate, 'on_rejected', event)
85
87 """
88 Called when the remote peer releases an outgoing message. Note
89 that this may be in response to either the RELEASE or MODIFIED
90 state as defined by the AMQP specification.
91 """
92 if self.delegate is not None:
93 dispatch(self.delegate, 'on_released', event)
94
96 """
97 Called when the remote peer has settled the outgoing
98 message. This is the point at which it should never be
99 retransmitted.
100 """
101 if self.delegate is not None:
102 dispatch(self.delegate, 'on_settled', event)
103
110
111
112 -class Reject(ProtonException):
113 """
114 An exception that indicate a message should be rejected
115 """
116 pass
117
118
119 -class Release(ProtonException):
120 """
121 An exception that indicate a message should be rejected
122 """
123 pass
124
128 """
129 Accepts a received message.
130
131 Note that this method cannot currently be used in combination
132 with transactions.
133 """
134 self.settle(delivery, Delivery.ACCEPTED)
135
142
143 - def release(self, delivery, delivered=True):
144 """
145 Releases a received message, making it available at the source
146 for any (other) interested receiver. The ``delivered``
147 parameter indicates whether this should be considered a
148 delivery attempt (and the delivery count updated) or not.
149 """
150 if delivered:
151 self.settle(delivery, Delivery.MODIFIED)
152 else:
153 self.settle(delivery, Delivery.RELEASED)
154
155 - def settle(self, delivery, state=None):
159
162 """
163 A utility for simpler and more intuitive handling of delivery
164 events related to incoming i.e. received messages.
165 """
166
167 - def __init__(self, auto_accept=True, delegate=None):
168 self.delegate = delegate
169 self.auto_accept = auto_accept
170
197
199 """
200 Called when a message is received. The message itself can be
201 obtained as a property on the event. For the purpose of
202 referring to this message in further actions (e.g. if
203 explicitly accepting it, the ``delivery`` should be used, also
204 obtainable via a property on the event.
205 """
206 if self.delegate is not None:
207 dispatch(self.delegate, 'on_message', event)
208
210 if self.delegate is not None:
211 dispatch(self.delegate, 'on_settled', event)
212
214 if self.delegate is not None:
215 dispatch(self.delegate, 'on_aborted', event)
216
219 """
220 A utility that exposes 'endpoint' events i.e. the open/close for
221 links, sessions and connections in a more intuitive manner. A
222 XXX_opened method will be called when both local and remote peers
223 have opened the link, session or connection. This can be used to
224 confirm a locally initiated action for example. A XXX_opening
225 method will be called when the remote peer has requested an open
226 that was not initiated locally. By default this will simply open
227 locally, which then triggers the XXX_opened call. The same applies
228 to close.
229 """
230
231 - def __init__(self, peer_close_is_error=False, delegate=None):
232 self.delegate = delegate
233 self.peer_close_is_error = peer_close_is_error
234
235 @classmethod
238
239 @classmethod
242
243 @classmethod
246
247 @classmethod
250
251 @classmethod
254
255 @classmethod
261
270
279
292
296
303
307
314
318
325
327 if self.delegate is not None:
328 dispatch(self.delegate, 'on_connection_opened', event)
329
331 if self.delegate is not None:
332 dispatch(self.delegate, 'on_session_opened', event)
333
335 if self.delegate is not None:
336 dispatch(self.delegate, 'on_link_opened', event)
337
339 if self.delegate is not None:
340 dispatch(self.delegate, 'on_connection_opening', event)
341
343 if self.delegate is not None:
344 dispatch(self.delegate, 'on_session_opening', event)
345
347 if self.delegate is not None:
348 dispatch(self.delegate, 'on_link_opening', event)
349
351 if self.delegate is not None:
352 dispatch(self.delegate, 'on_connection_error', event)
353 else:
354 self.log_error(event.connection, "connection")
355
357 if self.delegate is not None:
358 dispatch(self.delegate, 'on_session_error', event)
359 else:
360 self.log_error(event.session, "session")
361 event.connection.close()
362
364 if self.delegate is not None:
365 dispatch(self.delegate, 'on_link_error', event)
366 else:
367 self.log_error(event.link, "link")
368 event.connection.close()
369
371 if self.delegate is not None:
372 dispatch(self.delegate, 'on_connection_closed', event)
373
375 if self.delegate is not None:
376 dispatch(self.delegate, 'on_session_closed', event)
377
379 if self.delegate is not None:
380 dispatch(self.delegate, 'on_link_closed', event)
381
383 if self.delegate is not None:
384 dispatch(self.delegate, 'on_connection_closing', event)
385 elif self.peer_close_is_error:
386 self.on_connection_error(event)
387
389 if self.delegate is not None:
390 dispatch(self.delegate, 'on_session_closing', event)
391 elif self.peer_close_is_error:
392 self.on_session_error(event)
393
395 if self.delegate is not None:
396 dispatch(self.delegate, 'on_link_closing', event)
397 elif self.peer_close_is_error:
398 self.on_link_error(event)
399
402
406
409 """
410 A general purpose handler that makes the proton-c events somewhat
411 simpler to deal with and/or avoids repetitive tasks for common use
412 cases.
413 """
414
415 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
423
441
447
454
461
463 """
464 Called when the event loop - the reactor - starts.
465 """
466 if hasattr(event.reactor, 'subclass'):
467 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
468 self.on_start(event)
469
471 """
472 Called when the event loop starts. (Just an alias for on_reactor_init)
473 """
474 pass
475
477 """
478 Called when the connection is closed.
479 """
480 pass
481
483 """
484 Called when the session is closed.
485 """
486 pass
487
489 """
490 Called when the link is closed.
491 """
492 pass
493
495 """
496 Called when the peer initiates the closing of the connection.
497 """
498 pass
499
501 """
502 Called when the peer initiates the closing of the session.
503 """
504 pass
505
507 """
508 Called when the peer initiates the closing of the link.
509 """
510 pass
511
513 """
514 Called when the socket is disconnected.
515 """
516 pass
517
519 """
520 Called when the sender link has credit and messages can
521 therefore be transferred.
522 """
523 pass
524
526 """
527 Called when the remote peer accepts an outgoing message.
528 """
529 pass
530
532 """
533 Called when the remote peer rejects an outgoing message.
534 """
535 pass
536
538 """
539 Called when the remote peer releases an outgoing message. Note
540 that this may be in response to either the RELEASE or MODIFIED
541 state as defined by the AMQP specification.
542 """
543 pass
544
546 """
547 Called when the remote peer has settled the outgoing
548 message. This is the point at which it should never be
549 retransmitted.
550 """
551 pass
552
554 """
555 Called when a message is received. The message itself can be
556 obtained as a property on the event. For the purpose of
557 referring to this message in further actions (e.g. if
558 explicitly accepting it, the ``delivery`` should be used, also
559 obtainable via a property on the event.
560 """
561 pass
562
565 """
566 The interface for transaction handlers, i.e. objects that want to
567 be notified of state changes related to a transaction.
568 """
569
572
575
578
581
584
587 """
588 An extension to the MessagingHandler for applications using
589 transactions.
590 """
591
592 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
594
595 - def accept(self, delivery, transaction=None):
600
604 self._window = window
605 self._drained = 0
606
608 self._flow(event.link)
609
611 self._flow(event.link)
612
614 self._flow(event.link)
615
617 self._flow(event.link)
618
625
628
629 @staticmethod
634
635 @staticmethod
640
641 @staticmethod
648
649 @staticmethod
654
655 @staticmethod
660
661 @staticmethod
666
667
668
669 CFlowController = FlowController
670 CHandshaker = Handshaker
671
672
673 from ._events import WrappedHandler
674 from cproton import pn_iohandler
680
683
685 self.selectables = []
686 self.delegate = IOHandler()
687
690
692 self.selectables.append(event.context)
693
696
702
744