1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 log = logging.getLogger("proton")
42
43 -class Task(Wrapper):
44
45 @staticmethod
47 if impl is None:
48 return None
49 else:
50 return Task(impl)
51
54
57
59 pn_task_cancel(self._impl)
60
62
65
66 - def set_ssl_domain(self, ssl_domain):
67 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
68
70 pn_acceptor_close(self._impl)
71
73
74 @staticmethod
76 if impl is None:
77 return None
78 else:
79 record = pn_reactor_attachments(impl)
80 attrs = pn_void2py(pn_record_get(record, PYCTX))
81 if attrs and 'subclass' in attrs:
82 return attrs['subclass'](impl=impl)
83 else:
84 return Reactor(impl=impl)
85
86 - def __init__(self, *handlers, **kwargs):
90
93
94
95
96
99 self.reactor_impl = reactor._impl
103
106
108 self.errors.append(info)
109 self.yield_()
110
113
115 impl = _chandler(handler, self.on_error_delegate())
116 pn_reactor_set_global_handler(self._impl, impl)
117 pn_decref(impl)
118
119 global_handler = property(_get_global, _set_global)
120
122 return millis2timeout(pn_reactor_get_timeout(self._impl))
123
125 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
126
127 timeout = property(_get_timeout, _set_timeout)
128
130 pn_reactor_yield(self._impl)
131
133 return pn_reactor_mark(self._impl)
134
137
139 impl = _chandler(handler, self.on_error_delegate())
140 pn_reactor_set_handler(self._impl, impl)
141 pn_decref(impl)
142
143 handler = property(_get_handler, _set_handler)
144
153
155 n = pn_reactor_wakeup(self._impl)
156 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
157
159 pn_reactor_start(self._impl)
160
161 @property
163 return pn_reactor_quiesced(self._impl)
164
166 if self.errors:
167 for exc, value, tb in self.errors[:-1]:
168 traceback.print_exception(exc, value, tb)
169 exc, value, tb = self.errors[-1]
170 _compat.raise_(exc, value, tb)
171
173 result = pn_reactor_process(self._impl)
174 self._check_errors()
175 return result
176
178 pn_reactor_stop(self._impl)
179 self._check_errors()
180
182 impl = _chandler(task, self.on_error_delegate())
183 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
184 pn_decref(impl)
185 return task
186
187 - def acceptor(self, host, port, handler=None):
188 impl = _chandler(handler, self.on_error_delegate())
189 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
190 pn_decref(impl)
191 if aimpl:
192 return Acceptor(aimpl)
193 else:
194 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
195
197 """Deprecated: use connection_to_host() instead
198 """
199 impl = _chandler(handler, self.on_error_delegate())
200 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
201 if impl: pn_decref(impl)
202 return result
203
205 """Create an outgoing Connection that will be managed by the reactor.
206 The reator's pn_iohandler will create a socket connection to the host
207 once the connection is opened.
208 """
209 conn = self.connection(handler)
210 self.set_connection_host(conn, host, port)
211 return conn
212
214 """Change the address used by the connection. The address is
215 used by the reactor's iohandler to create an outgoing socket
216 connection. This must be set prior to opening the connection.
217 """
218 pn_reactor_set_connection_host(self._impl,
219 connection._impl,
220 unicode2utf8(str(host)),
221 unicode2utf8(str(port)))
222
224 """This may be used to retrieve the remote peer address.
225 @return: string containing the address in URL format or None if no
226 address is available. Use the proton.Url class to create a Url object
227 from the returned value.
228 """
229 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
230 return utf82unicode(_url)
231
233 impl = _chandler(handler, self.on_error_delegate())
234 result = Selectable.wrap(pn_reactor_selectable(self._impl))
235 if impl:
236 record = pn_selectable_attachments(result._impl)
237 pn_record_set_handler(record, impl)
238 pn_decref(impl)
239 return result
240
242 pn_reactor_update(self._impl, sel._impl)
243
245 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
246
247 from proton import wrappers as _wrappers
248 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
249 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
253 """
254 Can be added to a reactor to allow events to be triggered by an
255 external thread but handled on the event thread associated with
256 the reactor. An instance of this class can be passed to the
257 Reactor.selectable() method of the reactor in order to activate
258 it. The close() method should be called when it is no longer
259 needed, to allow the event loop to end if needed.
260 """
262 self.queue = Queue.Queue()
263 self.pipe = os.pipe()
264 self._closed = False
265
267 """
268 Request that the given event be dispatched on the event thread
269 of the reactor to which this EventInjector was added.
270 """
271 self.queue.put(event)
272 os.write(self.pipe[1], _compat.str2bin("!"))
273
275 """
276 Request that this EventInjector be closed. Existing events
277 will be dispctahed on the reactors event dispactch thread,
278 then this will be removed from the set of interest.
279 """
280 self._closed = True
281 os.write(self.pipe[1], _compat.str2bin("!"))
282
285
291
301
304 """
305 Application defined event, which can optionally be associated with
306 an engine object and or an arbitrary subject
307 """
308 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
321
325
327 """
328 Class to track state of an AMQP 1.0 transaction.
329 """
330 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
331 self.txn_ctrl = txn_ctrl
332 self.handler = handler
333 self.id = None
334 self._declare = None
335 self._discharge = None
336 self.failed = False
337 self._pending = []
338 self.settle_before_discharge = settle_before_discharge
339 self.declare()
340
343
346
348 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
349
353
358
359 - def send(self, sender, msg, tag=None):
364
371
372 - def update(self, delivery, state=None):
376
382
385
408
410 """
411 Abstract interface for link configuration options
412 """
414 """
415 Subclasses will implement any configuration logic in this
416 method
417 """
418 pass
419 - def test(self, link):
420 """
421 Subclasses can override this to selectively apply an option
422 e.g. based on some link criteria
423 """
424 return True
425
429
434
436 - def apply(self, sender): pass
438
440 - def apply(self, receiver): pass
442
457
460 self.filter_set = filter_set
461
462 - def apply(self, receiver):
464
466 """
467 Configures a link with a message selector filter
468 """
469 - def __init__(self, value, name='selector'):
471
473 - def apply(self, receiver):
476
477 -class Move(ReceiverOption):
478 - def apply(self, receiver):
480
481 -class Copy(ReceiverOption):
482 - def apply(self, receiver):
484
492
497
504
507 self._default_session = None
508
510 if not self._default_session:
511 self._default_session = _create_session(connection)
512 return self._default_session
513
515 """
516 Internal handler that triggers the necessary socket connect for an
517 opened connection.
518 """
521
523 if not self._override(event):
524 event.dispatch(self.base)
525
527 conn = event.connection
528 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
529
531 """
532 Internal handler that triggers the necessary socket connect for an
533 opened connection.
534 """
549
550 - def _connect(self, connection, reactor):
583
586
592
595
614
617
619 """
620 A reconnect strategy involving an increasing delay between
621 retries, up to a maximum or 10 seconds.
622 """
625
628
636
639 self.values = [Url(v) for v in values]
640 self.i = iter(self.values)
641
644
646 try:
647 return next(self.i)
648 except StopIteration:
649 self.i = iter(self.values)
650 return next(self.i)
651
664
667 """A representation of the AMQP concept of a 'container', which
668 lossely speaking is something that establishes links to or from
669 another container, over which messages are transfered. This is
670 an extension to the Reactor class that adds convenience methods
671 for creating connections and sender- or receiver- links.
672 """
673 - def __init__(self, *handlers, **kwargs):
689
690 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
691 """
692 Initiates the establishment of an AMQP connection. Returns an
693 instance of proton.Connection.
694
695 @param url: URL string of process to connect to
696
697 @param urls: list of URL strings of process to try to connect to
698
699 Only one of url or urls should be specified.
700
701 @param reconnect: A value of False will prevent the library
702 form automatically trying to reconnect if the underlying
703 socket is disconnected before the connection has been closed.
704
705 @param heartbeat: A value in milliseconds indicating the
706 desired frequency of heartbeats used to test the underlying
707 socket is alive.
708
709 @param ssl_domain: SSL configuration in the form of an
710 instance of proton.SSLdomain.
711
712 @param handler: a connection scoped handler that will be
713 called to process any events in the scope of this connection
714 or its child links
715
716 @param kwargs: sasl_enabled, which determines whether a sasl layer is
717 used for the connection; allowed_mechs an optional list of SASL
718 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
719 indicating whether insecure mechanisms, such as PLAIN over a
720 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
721 in the Open performative used by peer to determine the correct
722 back-end service for the client. If 'virtual_host' is not supplied the
723 host field from the URL is used instead."
724
725 """
726 conn = self.connection(handler)
727 conn.container = self.container_id or str(generate_uuid())
728 conn.offered_capabilities = kwargs.get('offered_capabilities')
729 conn.desired_capabilities = kwargs.get('desired_capabilities')
730 conn.properties = kwargs.get('properties')
731
732 connector = Connector(conn)
733 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
734 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
735 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
736 connector.user = kwargs.get('user', self.user)
737 connector.password = kwargs.get('password', self.password)
738 connector.virtual_host = kwargs.get('virtual_host')
739 if connector.virtual_host:
740
741 conn.hostname = connector.virtual_host
742 connector.ssl_sni = kwargs.get('sni')
743 connector.max_frame_size = kwargs.get('max_frame_size')
744
745 conn._overrides = connector
746 if url: connector.address = Urls([url])
747 elif urls: connector.address = Urls(urls)
748 elif address: connector.address = address
749 else: raise ValueError("One of url, urls or address required")
750 if heartbeat:
751 connector.heartbeat = heartbeat
752 if reconnect:
753 connector.reconnect = reconnect
754 elif reconnect is None:
755 connector.reconnect = Backoff()
756
757
758 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
759 conn._session_policy = SessionPerConnection()
760 conn.open()
761 return conn
762
763 - def _get_id(self, container, remote, local):
764 if local and remote: "%s-%s-%s" % (container, remote, local)
765 elif local: return "%s-%s" % (container, local)
766 elif remote: return "%s-%s" % (container, remote)
767 else: return "%s-%s" % (container, str(generate_uuid()))
768
781
782 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
783 """
784 Initiates the establishment of a link over which messages can
785 be sent. Returns an instance of proton.Sender.
786
787 There are two patterns of use. (1) A connection can be passed
788 as the first argument, in which case the link is established
789 on that connection. In this case the target address can be
790 specified as the second argument (or as a keyword
791 argument). The source address can also be specified if
792 desired. (2) Alternatively a URL can be passed as the first
793 argument. In this case a new connection will be establised on
794 which the link will be attached. If a path is specified and
795 the target is not, then the path of the URL is used as the
796 target address.
797
798 The name of the link may be specified if desired, otherwise a
799 unique name will be generated.
800
801 Various LinkOptions can be specified to further control the
802 attachment.
803 """
804 if isinstance(context, _compat.STRING_TYPES):
805 context = Url(context)
806 if isinstance(context, Url) and not target:
807 target = context.path
808 session = self._get_session(context)
809 snd = session.sender(name or self._get_id(session.connection.container, target, source))
810 if source:
811 snd.source.address = source
812 if target:
813 snd.target.address = target
814 if handler != None:
815 snd.handler = handler
816 if tags:
817 snd.tag_generator = tags
818 _apply_link_options(options, snd)
819 snd.open()
820 return snd
821
822 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
823 """
824 Initiates the establishment of a link over which messages can
825 be received (aka a subscription). Returns an instance of
826 proton.Receiver.
827
828 There are two patterns of use. (1) A connection can be passed
829 as the first argument, in which case the link is established
830 on that connection. In this case the source address can be
831 specified as the second argument (or as a keyword
832 argument). The target address can also be specified if
833 desired. (2) Alternatively a URL can be passed as the first
834 argument. In this case a new connection will be establised on
835 which the link will be attached. If a path is specified and
836 the source is not, then the path of the URL is used as the
837 target address.
838
839 The name of the link may be specified if desired, otherwise a
840 unique name will be generated.
841
842 Various LinkOptions can be specified to further control the
843 attachment.
844 """
845 if isinstance(context, _compat.STRING_TYPES):
846 context = Url(context)
847 if isinstance(context, Url) and not source:
848 source = context.path
849 session = self._get_session(context)
850 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
851 if source:
852 rcv.source.address = source
853 if dynamic:
854 rcv.source.dynamic = True
855 if target:
856 rcv.target.address = target
857 if handler != None:
858 rcv.handler = handler
859 _apply_link_options(options, rcv)
860 rcv.open()
861 return rcv
862
864 if not _get_attr(context, '_txn_ctrl'):
865 class InternalTransactionHandler(OutgoingMessageHandler):
866 def __init__(self):
867 super(InternalTransactionHandler, self).__init__(auto_settle=True)
868
869 def on_settled(self, event):
870 if hasattr(event.delivery, "transaction"):
871 event.transaction = event.delivery.transaction
872 event.delivery.transaction.handle_outcome(event)
873
874 def on_unhandled(self, method, event):
875 if handler:
876 event.dispatch(handler)
877
878 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
879 context._txn_ctrl.target.type = Terminus.COORDINATOR
880 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
881 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
882
883 - def listen(self, url, ssl_domain=None):
884 """
885 Initiates a server socket, accepting incoming AMQP connections
886 on the interface and port specified.
887 """
888 url = Url(url)
889 acceptor = self.acceptor(url.host, url.port)
890 ssl_config = ssl_domain
891 if not ssl_config and url.scheme == 'amqps':
892
893 if self.ssl:
894 ssl_config = self.ssl.server
895 else:
896 raise SSLUnavailable("amqps: SSL libraries not found")
897 if ssl_config:
898 acceptor.set_ssl_domain(ssl_config)
899 return acceptor
900
905