1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
135
137 n = pn_reactor_wakeup(self._impl)
138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
141 pn_reactor_start(self._impl)
142
143 @property
145 return pn_reactor_quiesced(self._impl)
146
148 if self.errors:
149 for exc, value, tb in self.errors[:-1]:
150 traceback.print_exception(exc, value, tb)
151 exc, value, tb = self.errors[-1]
152 _compat.raise_(exc, value, tb)
153
155 result = pn_reactor_process(self._impl)
156 self._check_errors()
157 return result
158
160 pn_reactor_stop(self._impl)
161 self._check_errors()
162 self.global_handler = None
163 self.handler = None
164
166 impl = _chandler(task, self.on_error)
167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
168 pn_decref(impl)
169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error)
173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
174 pn_decref(impl)
175 if aimpl:
176 return Acceptor(aimpl)
177 else:
178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
181 """Deprecated: use connection_to_host() instead
182 """
183 impl = _chandler(handler, self.on_error)
184 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
185 if impl: pn_decref(impl)
186 return result
187
189 """Create an outgoing Connection that will be managed by the reactor.
190 The reator's pn_iohandler will create a socket connection to the host
191 once the connection is opened.
192 """
193 conn = self.connection(handler)
194 self.set_connection_host(conn, host, port)
195 return conn
196
198 """Change the address used by the connection. The address is
199 used by the reactor's iohandler to create an outgoing socket
200 connection. This must be set prior to opening the connection.
201 """
202 pn_reactor_set_connection_host(self._impl,
203 connection._impl,
204 unicode2utf8(str(host)),
205 unicode2utf8(str(port)))
206
208 """This may be used to retrieve the remote peer address.
209 @return: string containing the address in URL format or None if no
210 address is available. Use the proton.Url class to create a Url object
211 from the returned value.
212 """
213 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
214 return utf82unicode(_url)
215
217 impl = _chandler(handler, self.on_error)
218 result = Selectable.wrap(pn_reactor_selectable(self._impl))
219 if impl:
220 record = pn_selectable_attachments(result._impl)
221 pn_record_set_handler(record, impl)
222 pn_decref(impl)
223 return result
224
226 pn_reactor_update(self._impl, sel._impl)
227
229 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
230
231 from proton import wrappers as _wrappers
232 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
233 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
237 """
238 Can be added to a reactor to allow events to be triggered by an
239 external thread but handled on the event thread associated with
240 the reactor. An instance of this class can be passed to the
241 Reactor.selectable() method of the reactor in order to activate
242 it. The close() method should be called when it is no longer
243 needed, to allow the event loop to end if needed.
244 """
246 self.queue = Queue.Queue()
247 self.pipe = os.pipe()
248 self._closed = False
249
251 """
252 Request that the given event be dispatched on the event thread
253 of the reactor to which this EventInjector was added.
254 """
255 self.queue.put(event)
256 os.write(self.pipe[1], _compat.str2bin("!"))
257
259 """
260 Request that this EventInjector be closed. Existing events
261 will be dispctahed on the reactors event dispactch thread,
262 then this will be removed from the set of interest.
263 """
264 self._closed = True
265 os.write(self.pipe[1], _compat.str2bin("!"))
266
269
275
285
288 """
289 Application defined event, which can optionally be associated with
290 an engine object and or an arbitrary subject
291 """
292 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
305
309
311 """
312 Class to track state of an AMQP 1.0 transaction.
313 """
314 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
315 self.txn_ctrl = txn_ctrl
316 self.handler = handler
317 self.id = None
318 self._declare = None
319 self._discharge = None
320 self.failed = False
321 self._pending = []
322 self.settle_before_discharge = settle_before_discharge
323 self.declare()
324
327
330
332 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
333
337
342
343 - def send(self, sender, msg, tag=None):
348
355
356 - def update(self, delivery, state=None):
360
366
369
392
394 """
395 Abstract interface for link configuration options
396 """
398 """
399 Subclasses will implement any configuration logic in this
400 method
401 """
402 pass
403 - def test(self, link):
404 """
405 Subclasses can override this to selectively apply an option
406 e.g. based on some link criteria
407 """
408 return True
409
413
418
420 - def apply(self, sender): pass
422
424 - def apply(self, receiver): pass
426
441
444 self.filter_set = filter_set
445
446 - def apply(self, receiver):
448
450 """
451 Configures a link with a message selector filter
452 """
453 - def __init__(self, value, name='selector'):
455
457 - def apply(self, receiver):
460
461 -class Move(ReceiverOption):
462 - def apply(self, receiver):
464
465 -class Copy(ReceiverOption):
466 - def apply(self, receiver):
468
476
481
488
491 self._default_session = None
492
494 if not self._default_session:
495 self._default_session = _create_session(connection)
496 self._default_session.context = self
497 return self._default_session
498
502
504 """
505 Internal handler that triggers the necessary socket connect for an
506 opened connection.
507 """
510
512 if not self._override(event):
513 event.dispatch(self.base)
514
516 conn = event.connection
517 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
518
520 """
521 Internal handler that triggers the necessary socket connect for an
522 opened connection.
523 """
536
537 - def _connect(self, connection, reactor):
568
571
577
580
595
598
601
603 """
604 A reconnect strategy involving an increasing delay between
605 retries, up to a maximum or 10 seconds.
606 """
609
612
620
623 self.values = [Url(v) for v in values]
624 self.i = iter(self.values)
625
628
630 try:
631 return next(self.i)
632 except StopIteration:
633 self.i = iter(self.values)
634 return next(self.i)
635
648
651 """A representation of the AMQP concept of a 'container', which
652 lossely speaking is something that establishes links to or from
653 another container, over which messages are transfered. This is
654 an extension to the Reactor class that adds convenience methods
655 for creating connections and sender- or receiver- links.
656 """
657 - def __init__(self, *handlers, **kwargs):
673
674 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
675 """
676 Initiates the establishment of an AMQP connection. Returns an
677 instance of proton.Connection.
678
679 @param url: URL string of process to connect to
680
681 @param urls: list of URL strings of process to try to connect to
682
683 Only one of url or urls should be specified.
684
685 @param reconnect: A value of False will prevent the library
686 form automatically trying to reconnect if the underlying
687 socket is disconnected before the connection has been closed.
688
689 @param heartbeat: A value in milliseconds indicating the
690 desired frequency of heartbeats used to test the underlying
691 socket is alive.
692
693 @param ssl_domain: SSL configuration in the form of an
694 instance of proton.SSLdomain.
695
696 @param handler: a connection scoped handler that will be
697 called to process any events in the scope of this connection
698 or its child links
699
700 @param kwargs: sasl_enabled, which determines whether a sasl layer is
701 used for the connection; allowed_mechs an optional list of SASL
702 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
703 indicating whether insecure mechanisms, such as PLAIN over a
704 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
705 in the Open performative used by peer to determine the correct
706 back-end service for the client. If 'virtual_host' is not supplied the
707 host field from the URL is used instead."
708
709 """
710 conn = self.connection(handler)
711 conn.container = self.container_id or str(generate_uuid())
712 conn.offered_capabilities = kwargs.get('offered_capabilities')
713 conn.desired_capabilities = kwargs.get('desired_capabilities')
714 conn.properties = kwargs.get('properties')
715
716 connector = Connector(conn)
717 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
718 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
719 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
720 connector.user = kwargs.get('user', self.user)
721 connector.password = kwargs.get('password', self.password)
722 connector.virtual_host = kwargs.get('virtual_host')
723 if connector.virtual_host:
724
725 conn.hostname = connector.virtual_host
726
727 conn._overrides = connector
728 if url: connector.address = Urls([url])
729 elif urls: connector.address = Urls(urls)
730 elif address: connector.address = address
731 else: raise ValueError("One of url, urls or address required")
732 if heartbeat:
733 connector.heartbeat = heartbeat
734 if reconnect:
735 connector.reconnect = reconnect
736 elif reconnect is None:
737 connector.reconnect = Backoff()
738
739
740 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
741 conn._session_policy = SessionPerConnection()
742 conn.open()
743 return conn
744
745 - def _get_id(self, container, remote, local):
746 if local and remote: "%s-%s-%s" % (container, remote, local)
747 elif local: return "%s-%s" % (container, local)
748 elif remote: return "%s-%s" % (container, remote)
749 else: return "%s-%s" % (container, str(generate_uuid()))
750
763
764 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
765 """
766 Initiates the establishment of a link over which messages can
767 be sent. Returns an instance of proton.Sender.
768
769 There are two patterns of use. (1) A connection can be passed
770 as the first argument, in which case the link is established
771 on that connection. In this case the target address can be
772 specified as the second argument (or as a keyword
773 argument). The source address can also be specified if
774 desired. (2) Alternatively a URL can be passed as the first
775 argument. In this case a new connection will be establised on
776 which the link will be attached. If a path is specified and
777 the target is not, then the path of the URL is used as the
778 target address.
779
780 The name of the link may be specified if desired, otherwise a
781 unique name will be generated.
782
783 Various LinkOptions can be specified to further control the
784 attachment.
785 """
786 if isinstance(context, _compat.STRING_TYPES):
787 context = Url(context)
788 if isinstance(context, Url) and not target:
789 target = context.path
790 session = self._get_session(context)
791 snd = session.sender(name or self._get_id(session.connection.container, target, source))
792 if source:
793 snd.source.address = source
794 if target:
795 snd.target.address = target
796 if handler != None:
797 snd.handler = handler
798 if tags:
799 snd.tag_generator = tags
800 _apply_link_options(options, snd)
801 snd.open()
802 return snd
803
804 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
805 """
806 Initiates the establishment of a link over which messages can
807 be received (aka a subscription). Returns an instance of
808 proton.Receiver.
809
810 There are two patterns of use. (1) A connection can be passed
811 as the first argument, in which case the link is established
812 on that connection. In this case the source address can be
813 specified as the second argument (or as a keyword
814 argument). The target address can also be specified if
815 desired. (2) Alternatively a URL can be passed as the first
816 argument. In this case a new connection will be establised on
817 which the link will be attached. If a path is specified and
818 the source is not, then the path of the URL is used as the
819 target address.
820
821 The name of the link may be specified if desired, otherwise a
822 unique name will be generated.
823
824 Various LinkOptions can be specified to further control the
825 attachment.
826 """
827 if isinstance(context, _compat.STRING_TYPES):
828 context = Url(context)
829 if isinstance(context, Url) and not source:
830 source = context.path
831 session = self._get_session(context)
832 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
833 if source:
834 rcv.source.address = source
835 if dynamic:
836 rcv.source.dynamic = True
837 if target:
838 rcv.target.address = target
839 if handler != None:
840 rcv.handler = handler
841 _apply_link_options(options, rcv)
842 rcv.open()
843 return rcv
844
846 if not _get_attr(context, '_txn_ctrl'):
847 class InternalTransactionHandler(OutgoingMessageHandler):
848 def __init__(self):
849 super(InternalTransactionHandler, self).__init__(auto_settle=True)
850
851 def on_settled(self, event):
852 if hasattr(event.delivery, "transaction"):
853 event.transaction = event.delivery.transaction
854 event.delivery.transaction.handle_outcome(event)
855 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
856 context._txn_ctrl.target.type = Terminus.COORDINATOR
857 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
858 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
859
860 - def listen(self, url, ssl_domain=None):
861 """
862 Initiates a server socket, accepting incoming AMQP connections
863 on the interface and port specified.
864 """
865 url = Url(url)
866 acceptor = self.acceptor(url.host, url.port)
867 ssl_config = ssl_domain
868 if not ssl_config and url.scheme == 'amqps':
869
870 if self.ssl:
871 ssl_config = self.ssl.server
872 else:
873 raise SSLUnavailable("amqps: SSL libraries not found")
874 if ssl_config:
875 acceptor.set_ssl_domain(ssl_config)
876 return acceptor
877
882