Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19   
 20  from __future__ import absolute_import 
 21   
 22  import logging 
 23  import time 
 24  import weakref 
 25  from select import select 
 26   
 27  from proton import Delivery, Endpoint 
 28  from proton import Message, Handler, ProtonException 
 29  from ._events import dispatch 
 30   
 31  log = logging.getLogger("proton") 
32 33 34 -class OutgoingMessageHandler(Handler):
35 """ 36 A utility for simpler and more intuitive handling of delivery 37 events related to outgoing i.e. sent messages. 38 """ 39
40 - def __init__(self, auto_settle=True, delegate=None):
41 self.auto_settle = auto_settle 42 self.delegate = delegate
43 49
50 - def on_delivery(self, event):
51 dlv = event.delivery 52 if dlv.link.is_sender and dlv.updated: 53 if dlv.remote_state == Delivery.ACCEPTED: 54 self.on_accepted(event) 55 elif dlv.remote_state == Delivery.REJECTED: 56 self.on_rejected(event) 57 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 58 self.on_released(event) 59 if dlv.settled: 60 self.on_settled(event) 61 if self.auto_settle: 62 dlv.settle()
63
64 - def on_sendable(self, event):
65 """ 66 Called when the sender link has credit and messages can 67 therefore be transferred. 68 """ 69 if self.delegate is not None: 70 dispatch(self.delegate, 'on_sendable', event)
71
72 - def on_accepted(self, event):
73 """ 74 Called when the remote peer accepts an outgoing message. 75 """ 76 if self.delegate is not None: 77 dispatch(self.delegate, 'on_accepted', event)
78
79 - def on_rejected(self, event):
80 """ 81 Called when the remote peer rejects an outgoing message. 82 """ 83 if self.delegate is not None: 84 dispatch(self.delegate, 'on_rejected', event)
85
86 - def on_released(self, event):
87 """ 88 Called when the remote peer releases an outgoing message. Note 89 that this may be in response to either the RELEASE or MODIFIED 90 state as defined by the AMQP specification. 91 """ 92 if self.delegate is not None: 93 dispatch(self.delegate, 'on_released', event)
94
95 - def on_settled(self, event):
96 """ 97 Called when the remote peer has settled the outgoing 98 message. This is the point at which it should never be 99 retransmitted. 100 """ 101 if self.delegate is not None: 102 dispatch(self.delegate, 'on_settled', event)
103
104 105 -def recv_msg(delivery):
106 msg = Message() 107 msg.decode(delivery.link.recv(delivery.pending)) 108 delivery.link.advance() 109 return msg
110
111 112 -class Reject(ProtonException):
113 """ 114 An exception that indicate a message should be rejected 115 """ 116 pass
117
118 119 -class Release(ProtonException):
120 """ 121 An exception that indicate a message should be rejected 122 """ 123 pass
124
125 126 -class Acking(object):
127 - def accept(self, delivery):
128 """ 129 Accepts a received message. 130 131 Note that this method cannot currently be used in combination 132 with transactions. 133 """ 134 self.settle(delivery, Delivery.ACCEPTED)
135
136 - def reject(self, delivery):
137 """ 138 Rejects a received message that is considered invalid or 139 unprocessable. 140 """ 141 self.settle(delivery, Delivery.REJECTED)
142
143 - def release(self, delivery, delivered=True):
144 """ 145 Releases a received message, making it available at the source 146 for any (other) interested receiver. The ``delivered`` 147 parameter indicates whether this should be considered a 148 delivery attempt (and the delivery count updated) or not. 149 """ 150 if delivered: 151 self.settle(delivery, Delivery.MODIFIED) 152 else: 153 self.settle(delivery, Delivery.RELEASED)
154
155 - def settle(self, delivery, state=None):
159
160 161 -class IncomingMessageHandler(Handler, Acking):
162 """ 163 A utility for simpler and more intuitive handling of delivery 164 events related to incoming i.e. received messages. 165 """ 166
167 - def __init__(self, auto_accept=True, delegate=None):
168 self.delegate = delegate 169 self.auto_accept = auto_accept
170
171 - def on_delivery(self, event):
172 dlv = event.delivery 173 if not dlv.link.is_receiver: return 174 if dlv.aborted: 175 self.on_aborted(event) 176 dlv.settle() 177 elif dlv.readable and not dlv.partial: 178 event.message = recv_msg(dlv) 179 if event.link.state & Endpoint.LOCAL_CLOSED: 180 if self.auto_accept: 181 dlv.update(Delivery.RELEASED) 182 dlv.settle() 183 else: 184 try: 185 self.on_message(event) 186 if self.auto_accept: 187 dlv.update(Delivery.ACCEPTED) 188 dlv.settle() 189 except Reject: 190 dlv.update(Delivery.REJECTED) 191 dlv.settle() 192 except Release: 193 dlv.update(Delivery.MODIFIED) 194 dlv.settle() 195 elif dlv.updated and dlv.settled: 196 self.on_settled(event)
197
198 - def on_message(self, event):
199 """ 200 Called when a message is received. The message itself can be 201 obtained as a property on the event. For the purpose of 202 referring to this message in further actions (e.g. if 203 explicitly accepting it, the ``delivery`` should be used, also 204 obtainable via a property on the event. 205 """ 206 if self.delegate is not None: 207 dispatch(self.delegate, 'on_message', event)
208
209 - def on_settled(self, event):
210 if self.delegate is not None: 211 dispatch(self.delegate, 'on_settled', event)
212
213 - def on_aborted(self, event):
214 if self.delegate is not None: 215 dispatch(self.delegate, 'on_aborted', event)
216
217 218 -class EndpointStateHandler(Handler):
219 """ 220 A utility that exposes 'endpoint' events i.e. the open/close for 221 links, sessions and connections in a more intuitive manner. A 222 XXX_opened method will be called when both local and remote peers 223 have opened the link, session or connection. This can be used to 224 confirm a locally initiated action for example. A XXX_opening 225 method will be called when the remote peer has requested an open 226 that was not initiated locally. By default this will simply open 227 locally, which then triggers the XXX_opened call. The same applies 228 to close. 229 """ 230
231 - def __init__(self, peer_close_is_error=False, delegate=None):
232 self.delegate = delegate 233 self.peer_close_is_error = peer_close_is_error
234 235 @classmethod
236 - def is_local_open(cls, endpoint):
237 return endpoint.state & Endpoint.LOCAL_ACTIVE
238 239 @classmethod
240 - def is_local_uninitialised(cls, endpoint):
241 return endpoint.state & Endpoint.LOCAL_UNINIT
242 243 @classmethod
244 - def is_local_closed(cls, endpoint):
245 return endpoint.state & Endpoint.LOCAL_CLOSED
246 247 @classmethod
248 - def is_remote_open(cls, endpoint):
249 return endpoint.state & Endpoint.REMOTE_ACTIVE
250 251 @classmethod
252 - def is_remote_closed(cls, endpoint):
253 return endpoint.state & Endpoint.REMOTE_CLOSED
254 255 @classmethod
256 - def print_error(cls, endpoint, endpoint_type):
257 if endpoint.remote_condition: 258 log.error(endpoint.remote_condition.description or endpoint.remote_condition.name) 259 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 260 log.error("%s closed by peer" % endpoint_type)
261 270
271 - def on_session_remote_close(self, event):
272 if event.session.remote_condition: 273 self.on_session_error(event) 274 elif self.is_local_closed(event.session): 275 self.on_session_closed(event) 276 else: 277 self.on_session_closing(event) 278 event.session.close()
279
280 - def on_connection_remote_close(self, event):
281 if event.connection.remote_condition: 282 if event.connection.remote_condition.name == "amqp:connection:forced": 283 # Treat this the same as just having the transport closed by the peer without 284 # sending any events. Allow reconnection to happen transparently. 285 return 286 self.on_connection_error(event) 287 elif self.is_local_closed(event.connection): 288 self.on_connection_closed(event) 289 else: 290 self.on_connection_closing(event) 291 event.connection.close()
292
293 - def on_connection_local_open(self, event):
294 if self.is_remote_open(event.connection): 295 self.on_connection_opened(event)
296
297 - def on_connection_remote_open(self, event):
298 if self.is_local_open(event.connection): 299 self.on_connection_opened(event) 300 elif self.is_local_uninitialised(event.connection): 301 self.on_connection_opening(event) 302 event.connection.open()
303
304 - def on_session_local_open(self, event):
305 if self.is_remote_open(event.session): 306 self.on_session_opened(event)
307
308 - def on_session_remote_open(self, event):
309 if self.is_local_open(event.session): 310 self.on_session_opened(event) 311 elif self.is_local_uninitialised(event.session): 312 self.on_session_opening(event) 313 event.session.open()
314 318 325
326 - def on_connection_opened(self, event):
327 if self.delegate is not None: 328 dispatch(self.delegate, 'on_connection_opened', event)
329
330 - def on_session_opened(self, event):
331 if self.delegate is not None: 332 dispatch(self.delegate, 'on_session_opened', event)
333 337
338 - def on_connection_opening(self, event):
339 if self.delegate is not None: 340 dispatch(self.delegate, 'on_connection_opening', event)
341
342 - def on_session_opening(self, event):
343 if self.delegate is not None: 344 dispatch(self.delegate, 'on_session_opening', event)
345 349
350 - def on_connection_error(self, event):
351 if self.delegate is not None: 352 dispatch(self.delegate, 'on_connection_error', event) 353 else: 354 self.log_error(event.connection, "connection")
355
356 - def on_session_error(self, event):
357 if self.delegate is not None: 358 dispatch(self.delegate, 'on_session_error', event) 359 else: 360 self.log_error(event.session, "session") 361 event.connection.close()
362 369
370 - def on_connection_closed(self, event):
371 if self.delegate is not None: 372 dispatch(self.delegate, 'on_connection_closed', event)
373
374 - def on_session_closed(self, event):
375 if self.delegate is not None: 376 dispatch(self.delegate, 'on_session_closed', event)
377 381
382 - def on_connection_closing(self, event):
383 if self.delegate is not None: 384 dispatch(self.delegate, 'on_connection_closing', event) 385 elif self.peer_close_is_error: 386 self.on_connection_error(event)
387
388 - def on_session_closing(self, event):
389 if self.delegate is not None: 390 dispatch(self.delegate, 'on_session_closing', event) 391 elif self.peer_close_is_error: 392 self.on_session_error(event)
393 399
400 - def on_transport_tail_closed(self, event):
401 self.on_transport_closed(event)
402
403 - def on_transport_closed(self, event):
404 if self.delegate is not None and event.connection and self.is_local_open(event.connection): 405 dispatch(self.delegate, 'on_disconnected', event)
406
407 408 -class MessagingHandler(Handler, Acking):
409 """ 410 A general purpose handler that makes the proton-c events somewhat 411 simpler to deal with and/or avoids repetitive tasks for common use 412 cases. 413 """ 414
415 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
416 self.handlers = [] 417 if prefetch: 418 self.handlers.append(FlowController(prefetch)) 419 self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) 420 self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) 421 self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) 422 self.fatal_conditions = ["amqp:unauthorized-access"]
423
424 - def on_transport_error(self, event):
425 """ 426 Called when some error is encountered with the transport over 427 which the AMQP connection is to be established. This includes 428 authentication errors as well as socket errors. 429 """ 430 if event.transport.condition: 431 if event.transport.condition.info: 432 log.error("%s: %s: %s" % ( 433 event.transport.condition.name, event.transport.condition.description, 434 event.transport.condition.info)) 435 else: 436 log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 437 if event.transport.condition.name in self.fatal_conditions: 438 event.connection.close() 439 else: 440 logging.error("Unspecified transport error")
441
442 - def on_connection_error(self, event):
443 """ 444 Called when the peer closes the connection with an error condition. 445 """ 446 EndpointStateHandler.print_error(event.connection, "connection")
447
448 - def on_session_error(self, event):
449 """ 450 Called when the peer closes the session with an error condition. 451 """ 452 EndpointStateHandler.print_error(event.session, "session") 453 event.connection.close()
454 461
462 - def on_reactor_init(self, event):
463 """ 464 Called when the event loop - the reactor - starts. 465 """ 466 if hasattr(event.reactor, 'subclass'): 467 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 468 self.on_start(event)
469
470 - def on_start(self, event):
471 """ 472 Called when the event loop starts. (Just an alias for on_reactor_init) 473 """ 474 pass
475
476 - def on_connection_closed(self, event):
477 """ 478 Called when the connection is closed. 479 """ 480 pass
481
482 - def on_session_closed(self, event):
483 """ 484 Called when the session is closed. 485 """ 486 pass
487 493
494 - def on_connection_closing(self, event):
495 """ 496 Called when the peer initiates the closing of the connection. 497 """ 498 pass
499
500 - def on_session_closing(self, event):
501 """ 502 Called when the peer initiates the closing of the session. 503 """ 504 pass
505 511
512 - def on_disconnected(self, event):
513 """ 514 Called when the socket is disconnected. 515 """ 516 pass
517
518 - def on_sendable(self, event):
519 """ 520 Called when the sender link has credit and messages can 521 therefore be transferred. 522 """ 523 pass
524
525 - def on_accepted(self, event):
526 """ 527 Called when the remote peer accepts an outgoing message. 528 """ 529 pass
530
531 - def on_rejected(self, event):
532 """ 533 Called when the remote peer rejects an outgoing message. 534 """ 535 pass
536
537 - def on_released(self, event):
538 """ 539 Called when the remote peer releases an outgoing message. Note 540 that this may be in response to either the RELEASE or MODIFIED 541 state as defined by the AMQP specification. 542 """ 543 pass
544
545 - def on_settled(self, event):
546 """ 547 Called when the remote peer has settled the outgoing 548 message. This is the point at which it should never be 549 retransmitted. 550 """ 551 pass
552
553 - def on_message(self, event):
554 """ 555 Called when a message is received. The message itself can be 556 obtained as a property on the event. For the purpose of 557 referring to this message in further actions (e.g. if 558 explicitly accepting it, the ``delivery`` should be used, also 559 obtainable via a property on the event. 560 """ 561 pass
562
563 564 -class TransactionHandler(object):
565 """ 566 The interface for transaction handlers, i.e. objects that want to 567 be notified of state changes related to a transaction. 568 """ 569
570 - def on_transaction_declared(self, event):
571 pass
572
573 - def on_transaction_committed(self, event):
574 pass
575
576 - def on_transaction_aborted(self, event):
577 pass
578
579 - def on_transaction_declare_failed(self, event):
580 pass
581
582 - def on_transaction_commit_failed(self, event):
583 pass
584
585 586 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
587 """ 588 An extension to the MessagingHandler for applications using 589 transactions. 590 """ 591
592 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
593 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
594
595 - def accept(self, delivery, transaction=None):
596 if transaction: 597 transaction.accept(delivery) 598 else: 599 super(TransactionalClientHandler, self).accept(delivery)
600
601 602 -class FlowController(Handler):
603 - def __init__(self, window=1024):
604 self._window = window 605 self._drained = 0
606 609 612 615
616 - def on_delivery(self, event):
617 self._flow(event.link)
618
619 - def _flow(self, link):
620 if link.is_receiver: 621 self._drained += link.drained() 622 if self._drained == 0: 623 delta = self._window - link.credit 624 link.flow(delta)
625
626 627 -class Handshaker(Handler):
628 629 @staticmethod
630 - def on_connection_remote_open(event):
631 conn = event.connection 632 if conn.state & Endpoint.LOCAL_UNINIT: 633 conn.open()
634 635 @staticmethod
636 - def on_session_remote_open(event):
637 ssn = event.session 638 if ssn.state() & Endpoint.LOCAL_UNINIT: 639 ssn.open()
640 641 @staticmethod 648 649 @staticmethod
650 - def on_connection_remote_close(event):
651 conn = event.connection 652 if not conn.state & Endpoint.LOCAL_CLOSED: 653 conn.close()
654 655 @staticmethod
656 - def on_session_remote_close(event):
657 ssn = event.session 658 if not ssn.state & Endpoint.LOCAL_CLOSED: 659 ssn.close()
660 661 @staticmethod
666 667 668 # Back compatibility definitions 669 CFlowController = FlowController 670 CHandshaker = Handshaker 671 672 673 from ._events import WrappedHandler 674 from cproton import pn_iohandler
675 676 -class IOHandler(WrappedHandler):
677
678 - def __init__(self):
679 WrappedHandler.__init__(self, pn_iohandler)
680
681 682 -class PythonIO:
683
684 - def __init__(self):
685 self.selectables = [] 686 self.delegate = IOHandler()
687
688 - def on_unhandled(self, method, event):
689 event.dispatch(self.delegate)
690
691 - def on_selectable_init(self, event):
692 self.selectables.append(event.context)
693
694 - def on_selectable_updated(self, event):
695 pass
696
697 - def on_selectable_final(self, event):
698 sel = event.context 699 if sel.is_terminal: 700 self.selectables.remove(sel) 701 sel.release()
702
703 - def on_reactor_quiesced(self, event):
704 reactor = event.reactor 705 # check if we are still quiesced, other handlers of 706 # on_reactor_quiesced could have produced events to process 707 if not reactor.quiesced: return 708 709 reading = [] 710 writing = [] 711 deadline = None 712 for sel in self.selectables: 713 if sel.reading: 714 reading.append(sel) 715 if sel.writing: 716 writing.append(sel) 717 if sel.deadline: 718 if deadline is None: 719 deadline = sel.deadline 720 else: 721 deadline = min(sel.deadline, deadline) 722 723 if deadline is not None: 724 timeout = deadline - time.time() 725 else: 726 timeout = reactor.timeout 727 if (timeout < 0): timeout = 0 728 timeout = min(timeout, reactor.timeout) 729 readable, writable, _ = select(reading, writing, [], timeout) 730 731 reactor.mark() 732 733 now = time.time() 734 735 for s in readable: 736 s.readable() 737 for s in writable: 738 s.writable() 739 for s in self.selectables: 740 if s.deadline and now > s.deadline: 741 s.expired() 742 743 reactor.yield_()
744