Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  from __future__ import absolute_import 
 21   
 22  import os 
 23  import logging 
 24  import traceback 
 25   
 26  from proton import Connection, Delivery, Described 
 27  from proton import Endpoint, EventType, Handler, Link, Message 
 28  from proton import Session, SSL, SSLDomain, SSLUnavailable, symbol 
 29  from proton import Terminus, Transport, ulong, Url 
 30  from proton.handlers import OutgoingMessageHandler 
 31   
 32  from proton import generate_uuid 
 33   
 34  from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode 
 35   
 36  from ._events import EventBase 
 37  from ._reactor_impl import Selectable, WrappedHandler, _chandler 
 38  from ._wrapper import Wrapper, PYCTX 
 39   
 40  from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \ 
 41      pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \ 
 42      pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \ 
 43      pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \ 
 44      pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \ 
 45      pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \ 
 46      pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \ 
 47      pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \ 
 48      pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup 
 49   
 50  from . import _compat 
 51   
 52  from ._compat import queue 
 53   
 54  log = logging.getLogger("proton") 
55 56 57 -def _timeout2millis(secs):
58 if secs is None: return PN_MILLIS_MAX 59 return secs2millis(secs)
60
61 62 -def _millis2timeout(millis):
63 if millis == PN_MILLIS_MAX: return None 64 return millis2secs(millis)
65
66 67 -class Task(Wrapper):
68 69 @staticmethod
70 - def wrap(impl):
71 if impl is None: 72 return None 73 else: 74 return Task(impl)
75
76 - def __init__(self, impl):
77 Wrapper.__init__(self, impl, pn_task_attachments)
78
79 - def _init(self):
80 pass
81
82 - def cancel(self):
83 pn_task_cancel(self._impl)
84
85 86 -class Acceptor(Wrapper):
87
88 - def __init__(self, impl):
89 Wrapper.__init__(self, impl)
90
91 - def set_ssl_domain(self, ssl_domain):
92 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
93
94 - def close(self):
95 pn_acceptor_close(self._impl)
96
97 98 -class Reactor(Wrapper):
99 100 @staticmethod
101 - def wrap(impl):
102 if impl is None: 103 return None 104 else: 105 record = pn_reactor_attachments(impl) 106 attrs = pn_void2py(pn_record_get(record, PYCTX)) 107 if attrs and 'subclass' in attrs: 108 return attrs['subclass'](impl=impl) 109 else: 110 return Reactor(impl=impl)
111
112 - def __init__(self, *handlers, **kwargs):
113 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 114 for h in handlers: 115 self.handler.add(h, on_error=self.on_error_delegate())
116
117 - def _init(self):
118 self.errors = []
119 120 # on_error relay handler tied to underlying C reactor. Use when the 121 # error will always be generated from a callback from this reactor. 122 # Needed to prevent reference cycles and be compatible with wrappers.
123 - class ErrorDelegate(object):
124 - def __init__(self, reactor):
125 self.reactor_impl = reactor._impl
126
127 - def on_error(self, info):
128 ractor = Reactor.wrap(self.reactor_impl) 129 ractor.on_error(info)
130
131 - def on_error_delegate(self):
132 return Reactor.ErrorDelegate(self).on_error
133
134 - def on_error(self, info):
135 self.errors.append(info) 136 self.yield_()
137
138 - def _get_global(self):
139 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
140
141 - def _set_global(self, handler):
142 impl = _chandler(handler, self.on_error_delegate()) 143 pn_reactor_set_global_handler(self._impl, impl) 144 pn_decref(impl)
145 146 global_handler = property(_get_global, _set_global) 147
148 - def _get_timeout(self):
149 return _millis2timeout(pn_reactor_get_timeout(self._impl))
150
151 - def _set_timeout(self, secs):
152 return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
153 154 timeout = property(_get_timeout, _set_timeout) 155
156 - def yield_(self):
157 pn_reactor_yield(self._impl)
158
159 - def mark(self):
160 return pn_reactor_mark(self._impl)
161
162 - def _get_handler(self):
163 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
164
165 - def _set_handler(self, handler):
166 impl = _chandler(handler, self.on_error_delegate()) 167 pn_reactor_set_handler(self._impl, impl) 168 pn_decref(impl)
169 170 handler = property(_get_handler, _set_handler) 171
172 - def run(self):
173 self.timeout = 3.14159265359 174 self.start() 175 while self.process(): pass 176 self.stop() 177 self.process() 178 self.global_handler = None 179 self.handler = None
180
181 - def wakeup(self):
182 n = pn_reactor_wakeup(self._impl) 183 if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
184
185 - def start(self):
186 pn_reactor_start(self._impl)
187 188 @property
189 - def quiesced(self):
190 return pn_reactor_quiesced(self._impl)
191
192 - def _check_errors(self):
193 if self.errors: 194 for exc, value, tb in self.errors[:-1]: 195 traceback.print_exception(exc, value, tb) 196 exc, value, tb = self.errors[-1] 197 _compat.raise_(exc, value, tb)
198
199 - def process(self):
200 result = pn_reactor_process(self._impl) 201 self._check_errors() 202 return result
203
204 - def stop(self):
205 pn_reactor_stop(self._impl) 206 self._check_errors()
207
208 - def schedule(self, delay, task):
209 impl = _chandler(task, self.on_error_delegate()) 210 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 211 pn_decref(impl) 212 return task
213
214 - def acceptor(self, host, port, handler=None):
215 impl = _chandler(handler, self.on_error_delegate()) 216 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 217 pn_decref(impl) 218 if aimpl: 219 return Acceptor(aimpl) 220 else: 221 raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
222
223 - def connection(self, handler=None):
224 """Deprecated: use connection_to_host() instead 225 """ 226 impl = _chandler(handler, self.on_error_delegate()) 227 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 228 if impl: pn_decref(impl) 229 return result
230
231 - def connection_to_host(self, host, port, handler=None):
232 """Create an outgoing Connection that will be managed by the reactor. 233 The reactor's pn_iohandler will create a socket connection to the host 234 once the connection is opened. 235 """ 236 conn = self.connection(handler) 237 self.set_connection_host(conn, host, port) 238 return conn
239
240 - def set_connection_host(self, connection, host, port):
241 """Change the address used by the connection. The address is 242 used by the reactor's iohandler to create an outgoing socket 243 connection. This must be set prior to opening the connection. 244 """ 245 pn_reactor_set_connection_host(self._impl, 246 connection._impl, 247 unicode2utf8(str(host)), 248 unicode2utf8(str(port)))
249
250 - def get_connection_address(self, connection):
251 """This may be used to retrieve the remote peer address. 252 @return: string containing the address in URL format or None if no 253 address is available. Use the proton.Url class to create a Url object 254 from the returned value. 255 """ 256 _url = pn_reactor_get_connection_address(self._impl, connection._impl) 257 return utf82unicode(_url)
258
259 - def selectable(self, handler=None):
260 impl = _chandler(handler, self.on_error_delegate()) 261 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 262 if impl: 263 record = pn_selectable_attachments(result._impl) 264 pn_record_set_handler(record, impl) 265 pn_decref(impl) 266 return result
267
268 - def update(self, sel):
269 pn_reactor_update(self._impl, sel._impl)
270
271 - def push_event(self, obj, etype):
272 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
273 274 275 from ._events import wrappers as _wrappers 276 277 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 278 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
279 280 281 -class EventInjector(object):
282 """ 283 Can be added to a reactor to allow events to be triggered by an 284 external thread but handled on the event thread associated with 285 the reactor. An instance of this class can be passed to the 286 Reactor.selectable() method of the reactor in order to activate 287 it. The close() method should be called when it is no longer 288 needed, to allow the event loop to end if needed. 289 """ 290
291 - def __init__(self):
292 self.queue = queue.Queue() 293 self.pipe = os.pipe() 294 self._closed = False
295
296 - def trigger(self, event):
297 """ 298 Request that the given event be dispatched on the event thread 299 of the reactor to which this EventInjector was added. 300 """ 301 self.queue.put(event) 302 os.write(self.pipe[1], b"!")
303
304 - def close(self):
305 """ 306 Request that this EventInjector be closed. Existing events 307 will be dispatched on the reactors event dispatch thread, 308 then this will be removed from the set of interest. 309 """ 310 self._closed = True 311 os.write(self.pipe[1], b"!")
312
313 - def fileno(self):
314 return self.pipe[0]
315
316 - def on_selectable_init(self, event):
317 sel = event.context 318 sel.fileno(self.fileno()) 319 sel.reading = True 320 event.reactor.update(sel)
321
322 - def on_selectable_readable(self, event):
323 os.read(self.pipe[0], 512) 324 while not self.queue.empty(): 325 requested = self.queue.get() 326 event.reactor.push_event(requested.context, requested.type) 327 if self._closed: 328 s = event.context 329 s.terminate() 330 event.reactor.update(s)
331
332 333 -class ApplicationEvent(EventBase):
334 """ 335 Application defined event, which can optionally be associated with 336 an engine object and or an arbitrary subject 337 """ 338
339 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
340 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 341 self.connection = connection 342 self.session = session 343 self.link = link 344 self.delivery = delivery 345 if self.delivery: 346 self.link = self.delivery.link 347 if self.link: 348 self.session = self.link.session 349 if self.session: 350 self.connection = self.session.connection 351 self.subject = subject
352
353 - def __repr__(self):
354 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 355 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
356
357 358 -class Transaction(object):
359 """ 360 Class to track state of an AMQP 1.0 transaction. 361 """ 362
363 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
364 self.txn_ctrl = txn_ctrl 365 self.handler = handler 366 self.id = None 367 self._declare = None 368 self._discharge = None 369 self.failed = False 370 self._pending = [] 371 self.settle_before_discharge = settle_before_discharge 372 self.declare()
373
374 - def commit(self):
375 self.discharge(False)
376
377 - def abort(self):
378 self.discharge(True)
379
380 - def declare(self):
381 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
382
383 - def discharge(self, failed):
384 self.failed = failed 385 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
386
387 - def _send_ctrl(self, descriptor, value):
388 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 389 delivery.transaction = self 390 return delivery
391
392 - def send(self, sender, msg, tag=None):
393 dlv = sender.send(msg, tag=tag) 394 dlv.local.data = [self.id] 395 dlv.update(0x34) 396 return dlv
397
398 - def accept(self, delivery):
399 self.update(delivery, PN_ACCEPTED) 400 if self.settle_before_discharge: 401 delivery.settle() 402 else: 403 self._pending.append(delivery)
404
405 - def update(self, delivery, state=None):
406 if state: 407 delivery.local.data = [self.id, Described(ulong(state), [])] 408 delivery.update(0x34)
409
410 - def _release_pending(self):
411 for d in self._pending: 412 d.update(Delivery.RELEASED) 413 d.settle() 414 self._clear_pending()
415
416 - def _clear_pending(self):
417 self._pending = []
418
419 - def handle_outcome(self, event):
420 if event.delivery == self._declare: 421 if event.delivery.remote.data: 422 self.id = event.delivery.remote.data[0] 423 self.handler.on_transaction_declared(event) 424 elif event.delivery.remote_state == Delivery.REJECTED: 425 self.handler.on_transaction_declare_failed(event) 426 else: 427 log.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 428 self.handler.on_transaction_declare_failed(event) 429 elif event.delivery == self._discharge: 430 if event.delivery.remote_state == Delivery.REJECTED: 431 if not self.failed: 432 self.handler.on_transaction_commit_failed(event) 433 self._release_pending() # make this optional? 434 else: 435 if self.failed: 436 self.handler.on_transaction_aborted(event) 437 self._release_pending() 438 else: 439 self.handler.on_transaction_committed(event) 440 self._clear_pending()
441
442 443 -class LinkOption(object):
444 """ 445 Abstract interface for link configuration options 446 """ 447
448 - def apply(self, link):
449 """ 450 Subclasses will implement any configuration logic in this 451 method 452 """ 453 pass
454
455 - def test(self, link):
456 """ 457 Subclasses can override this to selectively apply an option 458 e.g. based on some link criteria 459 """ 460 return True
461
462 463 -class AtMostOnce(LinkOption):
464 - def apply(self, link):
466
467 468 -class AtLeastOnce(LinkOption):
469 - def apply(self, link):
472
473 474 -class SenderOption(LinkOption):
475 - def apply(self, sender): pass
476
477 - def test(self, link): return link.is_sender
478
479 480 -class ReceiverOption(LinkOption):
481 - def apply(self, receiver): pass
482
483 - def test(self, link): return link.is_receiver
484
485 486 -class DynamicNodeProperties(LinkOption):
487 - def __init__(self, props={}):
488 self.properties = {} 489 for k in props: 490 if isinstance(k, symbol): 491 self.properties[k] = props[k] 492 else: 493 self.properties[symbol(k)] = props[k]
494
495 - def apply(self, link):
500
501 502 -class Filter(ReceiverOption):
503 - def __init__(self, filter_set={}):
504 self.filter_set = filter_set
505
506 - def apply(self, receiver):
507 receiver.source.filter.put_dict(self.filter_set)
508
509 510 -class Selector(Filter):
511 """ 512 Configures a link with a message selector filter 513 """ 514
515 - def __init__(self, value, name='selector'):
516 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
517
518 519 -class DurableSubscription(ReceiverOption):
520 - def apply(self, receiver):
523
524 525 -class Move(ReceiverOption):
526 - def apply(self, receiver):
528
529 530 -class Copy(ReceiverOption):
531 - def apply(self, receiver):
533 542
543 544 -def _create_session(connection, handler=None):
545 session = connection.session() 546 session.open() 547 return session
548
549 550 -def _get_attr(target, name):
551 if hasattr(target, name): 552 return getattr(target, name) 553 else: 554 return None
555
556 557 -class SessionPerConnection(object):
558 - def __init__(self):
559 self._default_session = None
560
561 - def session(self, connection):
562 if not self._default_session: 563 self._default_session = _create_session(connection) 564 return self._default_session
565
566 567 -class GlobalOverrides(object):
568 """ 569 Internal handler that triggers the necessary socket connect for an 570 opened connection. 571 """ 572
573 - def __init__(self, base):
574 self.base = base
575
576 - def on_unhandled(self, name, event):
577 if not self._override(event): 578 event.dispatch(self.base)
579
580 - def _override(self, event):
581 conn = event.connection 582 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
583
584 585 -class Connector(Handler):
586 """ 587 Internal handler that triggers the necessary socket connect for an 588 opened connection. 589 """ 590
591 - def __init__(self, connection):
592 self.connection = connection 593 self.address = None 594 self.heartbeat = None 595 self.reconnect = None 596 self.ssl_domain = None 597 self.allow_insecure_mechs = True 598 self.allowed_mechs = None 599 self.sasl_enabled = True 600 self.user = None 601 self.password = None 602 self.virtual_host = None 603 self.ssl_sni = None 604 self.max_frame_size = None
605
606 - def _connect(self, connection, reactor):
607 assert (reactor is not None) 608 url = self.address.next() 609 reactor.set_connection_host(connection, url.host, str(url.port)) 610 # if virtual-host not set, use host from address as default 611 if self.virtual_host is None: 612 connection.hostname = url.host 613 log.debug("connecting to %r..." % url) 614 615 transport = Transport() 616 if self.sasl_enabled: 617 sasl = transport.sasl() 618 sasl.allow_insecure_mechs = self.allow_insecure_mechs 619 if url.username: 620 connection.user = url.username 621 elif self.user: 622 connection.user = self.user 623 if url.password: 624 connection.password = url.password 625 elif self.password: 626 connection.password = self.password 627 if self.allowed_mechs: 628 sasl.allowed_mechs(self.allowed_mechs) 629 transport.bind(connection) 630 if self.heartbeat: 631 transport.idle_timeout = self.heartbeat 632 if url.scheme == 'amqps': 633 if not self.ssl_domain: 634 raise SSLUnavailable("amqps: SSL libraries not found") 635 self.ssl = SSL(transport, self.ssl_domain) 636 self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host 637 if self.max_frame_size: 638 transport.max_frame_size = self.max_frame_size
639
640 - def on_connection_local_open(self, event):
641 self._connect(event.connection, event.reactor)
642
643 - def on_connection_remote_open(self, event):
644 log.debug("connected to %s" % event.connection.hostname) 645 if self.reconnect: 646 self.reconnect.reset() 647 self.transport = None
648
649 - def on_transport_tail_closed(self, event):
650 self.on_transport_closed(event)
651
652 - def on_transport_closed(self, event):
653 if self.connection is None: return 654 if self.connection.state & Endpoint.LOCAL_ACTIVE: 655 if self.reconnect: 656 event.transport.unbind() 657 delay = self.reconnect.next() 658 if delay == 0: 659 log.info("Disconnected, reconnecting...") 660 self._connect(self.connection, event.reactor) 661 return 662 else: 663 log.info("Disconnected will try to reconnect after %s seconds" % delay) 664 event.reactor.schedule(delay, self) 665 return 666 else: 667 log.debug("Disconnected") 668 # See connector.cpp: conn.free()/pn_connection_release() here? 669 self.connection = None
670
671 - def on_timer_task(self, event):
672 self._connect(self.connection, event.reactor)
673
674 675 -class Backoff(object):
676 """ 677 A reconnect strategy involving an increasing delay between 678 retries, up to a maximum or 10 seconds. 679 """ 680
681 - def __init__(self):
682 self.delay = 0
683
684 - def reset(self):
685 self.delay = 0
686
687 - def next(self):
688 current = self.delay 689 if current == 0: 690 self.delay = 0.1 691 else: 692 self.delay = min(10, 2 * current) 693 return current
694
695 696 -class Urls(object):
697 - def __init__(self, values):
698 self.values = [Url(v) for v in values] 699 self.i = iter(self.values)
700
701 - def __iter__(self):
702 return self
703
704 - def next(self):
705 try: 706 return next(self.i) 707 except StopIteration: 708 self.i = iter(self.values) 709 return next(self.i)
710
711 712 -class SSLConfig(object):
713 - def __init__(self):
714 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 715 self.server = SSLDomain(SSLDomain.MODE_SERVER)
716
717 - def set_credentials(self, cert_file, key_file, password):
718 self.client.set_credentials(cert_file, key_file, password) 719 self.server.set_credentials(cert_file, key_file, password)
720
721 - def set_trusted_ca_db(self, certificate_db):
722 self.client.set_trusted_ca_db(certificate_db) 723 self.server.set_trusted_ca_db(certificate_db)
724
725 726 -class Container(Reactor):
727 """A representation of the AMQP concept of a 'container', which 728 loosely speaking is something that establishes links to or from 729 another container, over which messages are transfered. This is 730 an extension to the Reactor class that adds convenience methods 731 for creating connections and sender- or receiver- links. 732 """ 733
734 - def __init__(self, *handlers, **kwargs):
735 super(Container, self).__init__(*handlers, **kwargs) 736 if "impl" not in kwargs: 737 try: 738 self.ssl = SSLConfig() 739 except SSLUnavailable: 740 self.ssl = None 741 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 742 self.trigger = None 743 self.container_id = str(generate_uuid()) 744 self.allow_insecure_mechs = True 745 self.allowed_mechs = None 746 self.sasl_enabled = True 747 self.user = None 748 self.password = None 749 Wrapper.__setattr__(self, 'subclass', self.__class__)
750
751 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, 752 **kwargs):
753 """ 754 Initiates the establishment of an AMQP connection. Returns an 755 instance of proton.Connection. 756 757 @param url: URL string of process to connect to 758 759 @param urls: list of URL strings of process to try to connect to 760 761 Only one of url or urls should be specified. 762 763 @param reconnect: Reconnect is enabled by default. You can 764 pass in an instance of Backoff to control reconnect behavior. 765 A value of False will prevent the library from automatically 766 trying to reconnect if the underlying socket is disconnected 767 before the connection has been closed. 768 769 @param heartbeat: A value in milliseconds indicating the 770 desired frequency of heartbeats used to test the underlying 771 socket is alive. 772 773 @param ssl_domain: SSL configuration in the form of an 774 instance of proton.SSLDomain. 775 776 @param handler: a connection scoped handler that will be 777 called to process any events in the scope of this connection 778 or its child links 779 780 @param kwargs: 'sasl_enabled', which determines whether a sasl 781 layer is used for the connection; 'allowed_mechs', an optional 782 string containing a space-separated list of SASL mechanisms to 783 allow if sasl is enabled; 'allow_insecure_mechs', a flag 784 indicating whether insecure mechanisms, such as PLAIN over a 785 non-encrypted socket, are allowed; 'virtual_host', the 786 hostname to set in the Open performative used by peer to 787 determine the correct back-end service for the client. If 788 'virtual_host' is not supplied the host field from the URL is 789 used instead; 'user', the user to authenticate; 'password', 790 the authentication secret. 791 792 """ 793 conn = self.connection(handler) 794 conn.container = self.container_id or str(generate_uuid()) 795 conn.offered_capabilities = kwargs.get('offered_capabilities') 796 conn.desired_capabilities = kwargs.get('desired_capabilities') 797 conn.properties = kwargs.get('properties') 798 799 connector = Connector(conn) 800 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) 801 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) 802 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) 803 connector.user = kwargs.get('user', self.user) 804 connector.password = kwargs.get('password', self.password) 805 connector.virtual_host = kwargs.get('virtual_host') 806 if connector.virtual_host: 807 # only set hostname if virtual-host is a non-empty string 808 conn.hostname = connector.virtual_host 809 connector.ssl_sni = kwargs.get('sni') 810 connector.max_frame_size = kwargs.get('max_frame_size') 811 812 conn._overrides = connector 813 if url: 814 connector.address = Urls([url]) 815 elif urls: 816 connector.address = Urls(urls) 817 elif address: 818 connector.address = address 819 else: 820 raise ValueError("One of url, urls or address required") 821 if heartbeat: 822 connector.heartbeat = heartbeat 823 if reconnect: 824 connector.reconnect = reconnect 825 elif reconnect is None: 826 connector.reconnect = Backoff() 827 # use container's default client domain if none specified. This is 828 # only necessary of the URL specifies the "amqps:" scheme 829 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 830 conn._session_policy = SessionPerConnection() # todo: make configurable 831 conn.open() 832 return conn
833
834 - def _get_id(self, container, remote, local):
835 if local and remote: 836 "%s-%s-%s" % (container, remote, local) 837 elif local: 838 return "%s-%s" % (container, local) 839 elif remote: 840 return "%s-%s" % (container, remote) 841 else: 842 return "%s-%s" % (container, str(generate_uuid()))
843
844 - def _get_session(self, context):
845 if isinstance(context, Url): 846 return self._get_session(self.connect(url=context)) 847 elif isinstance(context, Session): 848 return context 849 elif isinstance(context, Connection): 850 if hasattr(context, '_session_policy'): 851 return context._session_policy.session(context) 852 else: 853 return _create_session(context) 854 else: 855 return context.session()
856
857 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
858 """ 859 Initiates the establishment of a link over which messages can 860 be sent. Returns an instance of proton.Sender. 861 862 There are two patterns of use. (1) A connection can be passed 863 as the first argument, in which case the link is established 864 on that connection. In this case the target address can be 865 specified as the second argument (or as a keyword 866 argument). The source address can also be specified if 867 desired. (2) Alternatively a URL can be passed as the first 868 argument. In this case a new connection will be established on 869 which the link will be attached. If a path is specified and 870 the target is not, then the path of the URL is used as the 871 target address. 872 873 The name of the link may be specified if desired, otherwise a 874 unique name will be generated. 875 876 Various LinkOptions can be specified to further control the 877 attachment. 878 """ 879 if isstring(context): 880 context = Url(context) 881 if isinstance(context, Url) and not target: 882 target = context.path 883 session = self._get_session(context) 884 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 885 if source: 886 snd.source.address = source 887 if target: 888 snd.target.address = target 889 if handler != None: 890 snd.handler = handler 891 if tags: 892 snd.tag_generator = tags 893 _apply_link_options(options, snd) 894 snd.open() 895 return snd
896
897 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
898 """ 899 Initiates the establishment of a link over which messages can 900 be received (aka a subscription). Returns an instance of 901 proton.Receiver. 902 903 There are two patterns of use. (1) A connection can be passed 904 as the first argument, in which case the link is established 905 on that connection. In this case the source address can be 906 specified as the second argument (or as a keyword 907 argument). The target address can also be specified if 908 desired. (2) Alternatively a URL can be passed as the first 909 argument. In this case a new connection will be established on 910 which the link will be attached. If a path is specified and 911 the source is not, then the path of the URL is used as the 912 target address. 913 914 The name of the link may be specified if desired, otherwise a 915 unique name will be generated. 916 917 Various LinkOptions can be specified to further control the 918 attachment. 919 """ 920 if isstring(context): 921 context = Url(context) 922 if isinstance(context, Url) and not source: 923 source = context.path 924 session = self._get_session(context) 925 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 926 if source: 927 rcv.source.address = source 928 if dynamic: 929 rcv.source.dynamic = True 930 if target: 931 rcv.target.address = target 932 if handler != None: 933 rcv.handler = handler 934 _apply_link_options(options, rcv) 935 rcv.open() 936 return rcv
937
938 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
939 if not _get_attr(context, '_txn_ctrl'): 940 class InternalTransactionHandler(OutgoingMessageHandler): 941 def __init__(self): 942 super(InternalTransactionHandler, self).__init__(auto_settle=True)
943 944 def on_settled(self, event): 945 if hasattr(event.delivery, "transaction"): 946 event.transaction = event.delivery.transaction 947 event.delivery.transaction.handle_outcome(event)
948 949 def on_unhandled(self, method, event): 950 if handler: 951 event.dispatch(handler) 952 953 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 954 context._txn_ctrl.target.type = Terminus.COORDINATOR 955 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 956 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 957
958 - def listen(self, url, ssl_domain=None):
959 """ 960 Initiates a server socket, accepting incoming AMQP connections 961 on the interface and port specified. 962 """ 963 url = Url(url) 964 acceptor = self.acceptor(url.host, url.port) 965 ssl_config = ssl_domain 966 if not ssl_config and url.scheme == 'amqps': 967 # use container's default server domain 968 if self.ssl: 969 ssl_config = self.ssl.server 970 else: 971 raise SSLUnavailable("amqps: SSL libraries not found") 972 if ssl_config: 973 acceptor.set_ssl_domain(ssl_config) 974 return acceptor
975
976 - def do_work(self, timeout=None):
977 if timeout: 978 self.timeout = timeout 979 return self.process()
980