1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 from __future__ import absolute_import
21
22 import os
23 import logging
24 import traceback
25
26 from proton import Connection, Delivery, Described
27 from proton import Endpoint, EventType, Handler, Link, Message
28 from proton import Session, SSL, SSLDomain, SSLUnavailable, symbol
29 from proton import Terminus, Transport, ulong, Url
30 from proton.handlers import OutgoingMessageHandler
31
32 from proton import generate_uuid
33
34 from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
35
36 from ._events import EventBase
37 from ._reactor_impl import Selectable, WrappedHandler, _chandler
38 from ._wrapper import Wrapper, PYCTX
39
40 from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
41 pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
42 pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
43 pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
44 pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
45 pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
46 pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
47 pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
48 pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
49
50 from . import _compat
51
52 from ._compat import queue
53
54 log = logging.getLogger("proton")
58 if secs is None: return PN_MILLIS_MAX
59 return secs2millis(secs)
60
63 if millis == PN_MILLIS_MAX: return None
64 return millis2secs(millis)
65
66
67 -class Task(Wrapper):
68
69 @staticmethod
71 if impl is None:
72 return None
73 else:
74 return Task(impl)
75
78
81
83 pn_task_cancel(self._impl)
84
87
90
91 - def set_ssl_domain(self, ssl_domain):
92 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
93
95 pn_acceptor_close(self._impl)
96
99
100 @staticmethod
102 if impl is None:
103 return None
104 else:
105 record = pn_reactor_attachments(impl)
106 attrs = pn_void2py(pn_record_get(record, PYCTX))
107 if attrs and 'subclass' in attrs:
108 return attrs['subclass'](impl=impl)
109 else:
110 return Reactor(impl=impl)
111
112 - def __init__(self, *handlers, **kwargs):
116
119
120
121
122
125 self.reactor_impl = reactor._impl
126
130
133
135 self.errors.append(info)
136 self.yield_()
137
140
142 impl = _chandler(handler, self.on_error_delegate())
143 pn_reactor_set_global_handler(self._impl, impl)
144 pn_decref(impl)
145
146 global_handler = property(_get_global, _set_global)
147
149 return _millis2timeout(pn_reactor_get_timeout(self._impl))
150
152 return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
153
154 timeout = property(_get_timeout, _set_timeout)
155
157 pn_reactor_yield(self._impl)
158
160 return pn_reactor_mark(self._impl)
161
164
166 impl = _chandler(handler, self.on_error_delegate())
167 pn_reactor_set_handler(self._impl, impl)
168 pn_decref(impl)
169
170 handler = property(_get_handler, _set_handler)
171
180
182 n = pn_reactor_wakeup(self._impl)
183 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
184
186 pn_reactor_start(self._impl)
187
188 @property
190 return pn_reactor_quiesced(self._impl)
191
193 if self.errors:
194 for exc, value, tb in self.errors[:-1]:
195 traceback.print_exception(exc, value, tb)
196 exc, value, tb = self.errors[-1]
197 _compat.raise_(exc, value, tb)
198
200 result = pn_reactor_process(self._impl)
201 self._check_errors()
202 return result
203
205 pn_reactor_stop(self._impl)
206 self._check_errors()
207
213
214 - def acceptor(self, host, port, handler=None):
215 impl = _chandler(handler, self.on_error_delegate())
216 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
217 pn_decref(impl)
218 if aimpl:
219 return Acceptor(aimpl)
220 else:
221 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
222
224 """Deprecated: use connection_to_host() instead
225 """
226 impl = _chandler(handler, self.on_error_delegate())
227 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
228 if impl: pn_decref(impl)
229 return result
230
232 """Create an outgoing Connection that will be managed by the reactor.
233 The reactor's pn_iohandler will create a socket connection to the host
234 once the connection is opened.
235 """
236 conn = self.connection(handler)
237 self.set_connection_host(conn, host, port)
238 return conn
239
241 """Change the address used by the connection. The address is
242 used by the reactor's iohandler to create an outgoing socket
243 connection. This must be set prior to opening the connection.
244 """
245 pn_reactor_set_connection_host(self._impl,
246 connection._impl,
247 unicode2utf8(str(host)),
248 unicode2utf8(str(port)))
249
251 """This may be used to retrieve the remote peer address.
252 @return: string containing the address in URL format or None if no
253 address is available. Use the proton.Url class to create a Url object
254 from the returned value.
255 """
256 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
257 return utf82unicode(_url)
258
260 impl = _chandler(handler, self.on_error_delegate())
261 result = Selectable.wrap(pn_reactor_selectable(self._impl))
262 if impl:
263 record = pn_selectable_attachments(result._impl)
264 pn_record_set_handler(record, impl)
265 pn_decref(impl)
266 return result
267
269 pn_reactor_update(self._impl, sel._impl)
270
272 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
273
274
275 from ._events import wrappers as _wrappers
276
277 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
278 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
282 """
283 Can be added to a reactor to allow events to be triggered by an
284 external thread but handled on the event thread associated with
285 the reactor. An instance of this class can be passed to the
286 Reactor.selectable() method of the reactor in order to activate
287 it. The close() method should be called when it is no longer
288 needed, to allow the event loop to end if needed.
289 """
290
292 self.queue = queue.Queue()
293 self.pipe = os.pipe()
294 self._closed = False
295
297 """
298 Request that the given event be dispatched on the event thread
299 of the reactor to which this EventInjector was added.
300 """
301 self.queue.put(event)
302 os.write(self.pipe[1], b"!")
303
305 """
306 Request that this EventInjector be closed. Existing events
307 will be dispatched on the reactors event dispatch thread,
308 then this will be removed from the set of interest.
309 """
310 self._closed = True
311 os.write(self.pipe[1], b"!")
312
315
321
331
334 """
335 Application defined event, which can optionally be associated with
336 an engine object and or an arbitrary subject
337 """
338
339 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
352
356
359 """
360 Class to track state of an AMQP 1.0 transaction.
361 """
362
363 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
364 self.txn_ctrl = txn_ctrl
365 self.handler = handler
366 self.id = None
367 self._declare = None
368 self._discharge = None
369 self.failed = False
370 self._pending = []
371 self.settle_before_discharge = settle_before_discharge
372 self.declare()
373
376
379
381 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
382
386
391
392 - def send(self, sender, msg, tag=None):
397
404
405 - def update(self, delivery, state=None):
409
415
418
441
444 """
445 Abstract interface for link configuration options
446 """
447
449 """
450 Subclasses will implement any configuration logic in this
451 method
452 """
453 pass
454
455 - def test(self, link):
456 """
457 Subclasses can override this to selectively apply an option
458 e.g. based on some link criteria
459 """
460 return True
461
466
472
475 - def apply(self, sender): pass
476
478
481 - def apply(self, receiver): pass
482
484
500
501
502 -class Filter(ReceiverOption):
504 self.filter_set = filter_set
505
506 - def apply(self, receiver):
508
511 """
512 Configures a link with a message selector filter
513 """
514
515 - def __init__(self, value, name='selector'):
517
520 - def apply(self, receiver):
523
524
525 -class Move(ReceiverOption):
526 - def apply(self, receiver):
528
529
530 -class Copy(ReceiverOption):
531 - def apply(self, receiver):
533
542
548
555
559 self._default_session = None
560
562 if not self._default_session:
563 self._default_session = _create_session(connection)
564 return self._default_session
565
568 """
569 Internal handler that triggers the necessary socket connect for an
570 opened connection.
571 """
572
575
577 if not self._override(event):
578 event.dispatch(self.base)
579
581 conn = event.connection
582 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
583
586 """
587 Internal handler that triggers the necessary socket connect for an
588 opened connection.
589 """
590
605
606 - def _connect(self, connection, reactor):
639
642
648
651
670
673
676 """
677 A reconnect strategy involving an increasing delay between
678 retries, up to a maximum or 10 seconds.
679 """
680
683
686
694
695
696 -class Urls(object):
700
703
705 try:
706 return next(self.i)
707 except StopIteration:
708 self.i = iter(self.values)
709 return next(self.i)
710
724
727 """A representation of the AMQP concept of a 'container', which
728 loosely speaking is something that establishes links to or from
729 another container, over which messages are transfered. This is
730 an extension to the Reactor class that adds convenience methods
731 for creating connections and sender- or receiver- links.
732 """
733
734 - def __init__(self, *handlers, **kwargs):
750
751 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
752 **kwargs):
753 """
754 Initiates the establishment of an AMQP connection. Returns an
755 instance of proton.Connection.
756
757 @param url: URL string of process to connect to
758
759 @param urls: list of URL strings of process to try to connect to
760
761 Only one of url or urls should be specified.
762
763 @param reconnect: Reconnect is enabled by default. You can
764 pass in an instance of Backoff to control reconnect behavior.
765 A value of False will prevent the library from automatically
766 trying to reconnect if the underlying socket is disconnected
767 before the connection has been closed.
768
769 @param heartbeat: A value in milliseconds indicating the
770 desired frequency of heartbeats used to test the underlying
771 socket is alive.
772
773 @param ssl_domain: SSL configuration in the form of an
774 instance of proton.SSLDomain.
775
776 @param handler: a connection scoped handler that will be
777 called to process any events in the scope of this connection
778 or its child links
779
780 @param kwargs: 'sasl_enabled', which determines whether a sasl
781 layer is used for the connection; 'allowed_mechs', an optional
782 string containing a space-separated list of SASL mechanisms to
783 allow if sasl is enabled; 'allow_insecure_mechs', a flag
784 indicating whether insecure mechanisms, such as PLAIN over a
785 non-encrypted socket, are allowed; 'virtual_host', the
786 hostname to set in the Open performative used by peer to
787 determine the correct back-end service for the client. If
788 'virtual_host' is not supplied the host field from the URL is
789 used instead; 'user', the user to authenticate; 'password',
790 the authentication secret.
791
792 """
793 conn = self.connection(handler)
794 conn.container = self.container_id or str(generate_uuid())
795 conn.offered_capabilities = kwargs.get('offered_capabilities')
796 conn.desired_capabilities = kwargs.get('desired_capabilities')
797 conn.properties = kwargs.get('properties')
798
799 connector = Connector(conn)
800 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
801 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
802 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
803 connector.user = kwargs.get('user', self.user)
804 connector.password = kwargs.get('password', self.password)
805 connector.virtual_host = kwargs.get('virtual_host')
806 if connector.virtual_host:
807
808 conn.hostname = connector.virtual_host
809 connector.ssl_sni = kwargs.get('sni')
810 connector.max_frame_size = kwargs.get('max_frame_size')
811
812 conn._overrides = connector
813 if url:
814 connector.address = Urls([url])
815 elif urls:
816 connector.address = Urls(urls)
817 elif address:
818 connector.address = address
819 else:
820 raise ValueError("One of url, urls or address required")
821 if heartbeat:
822 connector.heartbeat = heartbeat
823 if reconnect:
824 connector.reconnect = reconnect
825 elif reconnect is None:
826 connector.reconnect = Backoff()
827
828
829 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
830 conn._session_policy = SessionPerConnection()
831 conn.open()
832 return conn
833
834 - def _get_id(self, container, remote, local):
835 if local and remote:
836 "%s-%s-%s" % (container, remote, local)
837 elif local:
838 return "%s-%s" % (container, local)
839 elif remote:
840 return "%s-%s" % (container, remote)
841 else:
842 return "%s-%s" % (container, str(generate_uuid()))
843
856
857 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
858 """
859 Initiates the establishment of a link over which messages can
860 be sent. Returns an instance of proton.Sender.
861
862 There are two patterns of use. (1) A connection can be passed
863 as the first argument, in which case the link is established
864 on that connection. In this case the target address can be
865 specified as the second argument (or as a keyword
866 argument). The source address can also be specified if
867 desired. (2) Alternatively a URL can be passed as the first
868 argument. In this case a new connection will be established on
869 which the link will be attached. If a path is specified and
870 the target is not, then the path of the URL is used as the
871 target address.
872
873 The name of the link may be specified if desired, otherwise a
874 unique name will be generated.
875
876 Various LinkOptions can be specified to further control the
877 attachment.
878 """
879 if isstring(context):
880 context = Url(context)
881 if isinstance(context, Url) and not target:
882 target = context.path
883 session = self._get_session(context)
884 snd = session.sender(name or self._get_id(session.connection.container, target, source))
885 if source:
886 snd.source.address = source
887 if target:
888 snd.target.address = target
889 if handler != None:
890 snd.handler = handler
891 if tags:
892 snd.tag_generator = tags
893 _apply_link_options(options, snd)
894 snd.open()
895 return snd
896
897 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
898 """
899 Initiates the establishment of a link over which messages can
900 be received (aka a subscription). Returns an instance of
901 proton.Receiver.
902
903 There are two patterns of use. (1) A connection can be passed
904 as the first argument, in which case the link is established
905 on that connection. In this case the source address can be
906 specified as the second argument (or as a keyword
907 argument). The target address can also be specified if
908 desired. (2) Alternatively a URL can be passed as the first
909 argument. In this case a new connection will be established on
910 which the link will be attached. If a path is specified and
911 the source is not, then the path of the URL is used as the
912 target address.
913
914 The name of the link may be specified if desired, otherwise a
915 unique name will be generated.
916
917 Various LinkOptions can be specified to further control the
918 attachment.
919 """
920 if isstring(context):
921 context = Url(context)
922 if isinstance(context, Url) and not source:
923 source = context.path
924 session = self._get_session(context)
925 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
926 if source:
927 rcv.source.address = source
928 if dynamic:
929 rcv.source.dynamic = True
930 if target:
931 rcv.target.address = target
932 if handler != None:
933 rcv.handler = handler
934 _apply_link_options(options, rcv)
935 rcv.open()
936 return rcv
937
939 if not _get_attr(context, '_txn_ctrl'):
940 class InternalTransactionHandler(OutgoingMessageHandler):
941 def __init__(self):
942 super(InternalTransactionHandler, self).__init__(auto_settle=True)
943
944 def on_settled(self, event):
945 if hasattr(event.delivery, "transaction"):
946 event.transaction = event.delivery.transaction
947 event.delivery.transaction.handle_outcome(event)
948
949 def on_unhandled(self, method, event):
950 if handler:
951 event.dispatch(handler)
952
953 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
954 context._txn_ctrl.target.type = Terminus.COORDINATOR
955 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
956 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
957
958 - def listen(self, url, ssl_domain=None):
959 """
960 Initiates a server socket, accepting incoming AMQP connections
961 on the interface and port specified.
962 """
963 url = Url(url)
964 acceptor = self.acceptor(url.host, url.port)
965 ssl_config = ssl_domain
966 if not ssl_config and url.scheme == 'amqps':
967
968 if self.ssl:
969 ssl_config = self.ssl.server
970 else:
971 raise SSLUnavailable("amqps: SSL libraries not found")
972 if ssl_config:
973 acceptor.set_ssl_domain(ssl_config)
974 return acceptor
975
980