Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, socket, time, threading 20 21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 22 from proton import ProtonException, Timeout, Url 23 from proton.reactor import Container 24 from proton.handlers import MessagingHandler, IncomingMessageHandler 25 from cproton import pn_reactor_collector, pn_collector_release59 68 7230 self.connection = connection 31 self.link = link 32 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 33 msg="Opening link %s" % link.name) 34 self._checkClosed()3537 try: 38 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 39 timeout=timeout, 40 msg="Opening link %s" % self.link.name) 41 except Timeout as e: 42 pass 43 self._checkClosed()4446 if self.link.state & Endpoint.REMOTE_CLOSED: 47 self.link.close() 48 if not self.connection.closing: 49 raise LinkDetached(self.link)5052 self.link.close() 53 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 54 msg="Closing link %s" % self.link.name)55 56 # Access to other link attributes.9676 super(BlockingSender, self).__init__(connection, sender) 77 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 78 # this may be followed by a detach, which may contain an error condition, so wait a little... 79 self._waitForClose() 80 # ...but close ourselves if peer does not 81 self.link.close() 82 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)8385 delivery = self.link.send(msg) 86 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 87 timeout=timeout) 88 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 89 delivery.settle() 90 bad = error_states 91 if bad is None: 92 bad = [Delivery.REJECTED, Delivery.RELEASED] 93 if delivery.remote_state in bad: 94 raise SendException(delivery.remote_state) 95 return delivery134100 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 101 self.connection = connection 102 self.incoming = collections.deque([]) 103 self.unsettled = collections.deque([])104106 self.incoming.append((event.message, event.delivery)) 107 self.connection.container.yield_() # Wake up the wait() loop to handle the message.108110 if event.link.state & Endpoint.LOCAL_ACTIVE: 111 event.link.close() 112 if not self.connection.closing: 113 raise LinkDetached(event.link)114 118 119 @property 122124 message, delivery = self.incoming.popleft() 125 if not delivery.settled: 126 self.unsettled.append(delivery) 127 return message128182138 super(BlockingReceiver, self).__init__(connection, receiver) 139 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 140 # this may be followed by a detach, which may contain an error condition, so wait a little... 141 self._waitForClose() 142 # ...but close ourselves if peer does not 143 self.link.close() 144 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 145 if credit: receiver.flow(credit) 146 self.fetcher = fetcher 147 self.container = connection.container148150 self.fetcher = None 151 # The next line causes a core dump if the Proton-C reactor finalizes 152 # first. The self.container reference prevents out of order reactor 153 # finalization. It may not be set if exception in BlockingLink.__init__ 154 if hasattr(self, "container"): 155 self.link.handler = None # implicit call to reactor156158 if not self.fetcher: 159 raise Exception("Can't call receive on this receiver as a handler was provided") 160 if not self.link.credit: 161 self.link.flow(1) 162 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 163 timeout=timeout) 164 return self.fetcher.pop()165 168 171 177198186 self.link = link 187 if link.is_sender: 188 txt = "sender %s to %s closed" % (link.name, link.target.address) 189 else: 190 txt = "receiver %s from %s closed" % (link.name, link.source.address) 191 if link.remote_condition: 192 txt += " due to: %s" % link.remote_condition 193 self.condition = link.remote_condition.name 194 else: 195 txt += " by peer" 196 self.condition = None 197 super(LinkDetached, self).__init__(txt)211202 self.connection = connection 203 txt = "Connection %s closed" % connection.hostname 204 if connection.remote_condition: 205 txt += " due to: %s" % connection.remote_condition 206 self.condition = connection.remote_condition.name 207 else: 208 txt += " by peer" 209 self.condition = None 210 super(ConnectionClosed, self).__init__(txt)214 """ 215 A synchronous style connection wrapper. 216 217 This object's implementation uses OS resources. To ensure they 218 are released when the object is no longer in use, make sure that 219 object operations are enclosed in a try block and that close() is 220 always executed on exit. 221 """ 222336223 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):224 self.disconnected = False 225 self.timeout = timeout or 60 226 self.container = container or Container() 227 self.container.timeout = self.timeout 228 self.container.start() 229 self.url = Url(url).defaults() 230 self.conn = None 231 self.closing = False 232 failed = True 233 try: 234 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 235 heartbeat=heartbeat, **kwargs) 236 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 237 msg="Opening connection") 238 failed = False 239 finally: 240 if failed and self.conn: 241 self.close()242244 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 245 options=options))246247 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):248 prefetch = credit 249 if handler: 250 fetcher = None 251 if prefetch is None: 252 prefetch = 1 253 else: 254 fetcher = Fetcher(self, credit) 255 return BlockingReceiver( 256 self, 257 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 258 options=options), fetcher, credit=prefetch)259261 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 262 if self.closing: 263 return 264 self.closing = True 265 self.container.errors = [] 266 try: 267 if self.conn: 268 self.conn.close() 269 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 270 msg="Closing connection") 271 finally: 272 self.conn.free() 273 # Nothing left to block on. Allow reactor to clean up. 274 self.run() 275 self.conn = None 276 self.container.global_handler = None # break circular ref: container to cadapter.on_error 277 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 278 self.container = None279 282284 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 285 while self.container.process(): pass 286 self.container.stop() 287 self.container.process()288290 """Call process until condition() is true""" 291 if timeout is False: 292 timeout = self.timeout 293 if timeout is None: 294 while not condition() and not self.disconnected: 295 self.container.process() 296 else: 297 container_timeout = self.container.timeout 298 self.container.timeout = timeout 299 try: 300 deadline = time.time() + timeout 301 while not condition() and not self.disconnected: 302 self.container.process() 303 if deadline < time.time(): 304 txt = "Connection %s timed out" % self.url 305 if msg: txt += ": " + msg 306 raise Timeout(txt) 307 finally: 308 self.container.timeout = container_timeout 309 if self.disconnected or self._is_closed(): 310 self.container.stop() 311 self.conn.handler = None # break cyclical reference 312 if self.disconnected and not self._is_closed(): 313 raise ConnectionException( 314 "Connection %s disconnected: %s" % (self.url, self.disconnected))315317 if event.link.state & Endpoint.LOCAL_ACTIVE: 318 event.link.close() 319 if not self.closing: 320 raise LinkDetached(event.link)321323 if event.connection.state & Endpoint.LOCAL_ACTIVE: 324 event.connection.close() 325 if not self.closing: 326 raise ConnectionClosed(event.connection)327329 self.on_transport_closed(event)330332 self.on_transport_closed(event)333351340 """Thread-safe atomic counter. Start at start, increment by step.""" 341 self.count, self.step = start, step 342 self.lock = threading.Lock()343345 """Get the next value""" 346 self.lock.acquire() 347 self.count += self.step; 348 result = self.count 349 self.lock.release() 350 return result354 """ 355 Implementation of the synchronous request-response (aka RPC) pattern. 356 @ivar address: Address for all requests, may be None. 357 @ivar connection: Connection for requests and responses. 358 """ 359 360 correlation_id = AtomicCount() 361402 403 @property363 """ 364 Send requests and receive responses. A single instance can send many requests 365 to the same or different addresses. 366 367 @param connection: A L{BlockingConnection} 368 @param address: Address for all requests. 369 If not specified, each request must have the address property set. 370 Successive messages may have different addresses. 371 """ 372 super(SyncRequestResponse, self).__init__() 373 self.connection = connection 374 self.address = address 375 self.sender = self.connection.create_sender(self.address) 376 # dynamic=true generates a unique address dynamically for this receiver. 377 # credit=1 because we want to receive 1 response message initially. 378 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 379 self.response = None380382 """ 383 Send a request message, wait for and return the response message. 384 385 @param request: A L{proton.Message}. If L{self.address} is not set the 386 L{self.address} must be set and will be used. 387 """ 388 if not self.address and not request.address: 389 raise ValueError("Request message has no address: %s" % request) 390 request.reply_to = self.reply_to 391 request.correlation_id = correlation_id = str(self.correlation_id.next()) 392 self.sender.send(request) 393 394 def wakeup(): 395 return self.response and (self.response.correlation_id == correlation_id)396 397 self.connection.wait(wakeup, msg="Waiting for response") 398 response = self.response 399 self.response = None # Ready for next response. 400 self.receiver.flow(1) # Set up credit for the next response. 401 return response405 """Return the dynamic address of our receiver.""" 406 return self.receiver.remote_source.address407409 """Called when we receive a message for our receiver.""" 410 self.response = event.message 411 self.connection.container.yield_() # Wake up the wait() loop to handle the message.412
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Wed Apr 8 02:11:12 2020 | http://epydoc.sourceforge.net |