Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 25  from cproton import pn_reactor_collector, pn_collector_release 
 59   
60 61 -class SendException(ProtonException):
62 """ 63 Exception used to indicate an exceptional state/condition on a send request 64 """ 65
66 - def __init__(self, state):
67 self.state = state
68
69 70 -def _is_settled(delivery):
71 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
72
73 74 -class BlockingSender(BlockingLink):
75 - def __init__(self, connection, sender):
76 super(BlockingSender, self).__init__(connection, sender) 77 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 78 # this may be followed by a detach, which may contain an error condition, so wait a little... 79 self._waitForClose() 80 # ...but close ourselves if peer does not 81 self.link.close() 82 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
83
84 - def send(self, msg, timeout=False, error_states=None):
85 delivery = self.link.send(msg) 86 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, 87 timeout=timeout) 88 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 89 delivery.settle() 90 bad = error_states 91 if bad is None: 92 bad = [Delivery.REJECTED, Delivery.RELEASED] 93 if delivery.remote_state in bad: 94 raise SendException(delivery.remote_state) 95 return delivery
96
97 98 -class Fetcher(MessagingHandler):
99 - def __init__(self, connection, prefetch):
100 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 101 self.connection = connection 102 self.incoming = collections.deque([]) 103 self.unsettled = collections.deque([])
104
105 - def on_message(self, event):
106 self.incoming.append((event.message, event.delivery)) 107 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
108 114
115 - def on_connection_error(self, event):
116 if not self.connection.closing: 117 raise ConnectionClosed(event.connection)
118 119 @property
120 - def has_message(self):
121 return len(self.incoming)
122
123 - def pop(self):
124 message, delivery = self.incoming.popleft() 125 if not delivery.settled: 126 self.unsettled.append(delivery) 127 return message
128
129 - def settle(self, state=None):
130 delivery = self.unsettled.popleft() 131 if state: 132 delivery.update(state) 133 delivery.settle()
134
135 136 -class BlockingReceiver(BlockingLink):
137 - def __init__(self, connection, receiver, fetcher, credit=1):
138 super(BlockingReceiver, self).__init__(connection, receiver) 139 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 140 # this may be followed by a detach, which may contain an error condition, so wait a little... 141 self._waitForClose() 142 # ...but close ourselves if peer does not 143 self.link.close() 144 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 145 if credit: receiver.flow(credit) 146 self.fetcher = fetcher 147 self.container = connection.container
148
149 - def __del__(self):
150 self.fetcher = None 151 # The next line causes a core dump if the Proton-C reactor finalizes 152 # first. The self.container reference prevents out of order reactor 153 # finalization. It may not be set if exception in BlockingLink.__init__ 154 if hasattr(self, "container"): 155 self.link.handler = None # implicit call to reactor
156
157 - def receive(self, timeout=False):
158 if not self.fetcher: 159 raise Exception("Can't call receive on this receiver as a handler was provided") 160 if not self.link.credit: 161 self.link.flow(1) 162 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, 163 timeout=timeout) 164 return self.fetcher.pop()
165
166 - def accept(self):
168
169 - def reject(self):
171
172 - def release(self, delivered=True):
173 if delivered: 174 self.settle(Delivery.MODIFIED) 175 else: 176 self.settle(Delivery.RELEASED)
177
178 - def settle(self, state=None):
179 if not self.fetcher: 180 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 181 self.fetcher.settle(state)
182
183 184 -class LinkDetached(LinkException):
185 - def __init__(self, link):
186 self.link = link 187 if link.is_sender: 188 txt = "sender %s to %s closed" % (link.name, link.target.address) 189 else: 190 txt = "receiver %s from %s closed" % (link.name, link.source.address) 191 if link.remote_condition: 192 txt += " due to: %s" % link.remote_condition 193 self.condition = link.remote_condition.name 194 else: 195 txt += " by peer" 196 self.condition = None 197 super(LinkDetached, self).__init__(txt)
198
199 200 -class ConnectionClosed(ConnectionException):
201 - def __init__(self, connection):
202 self.connection = connection 203 txt = "Connection %s closed" % connection.hostname 204 if connection.remote_condition: 205 txt += " due to: %s" % connection.remote_condition 206 self.condition = connection.remote_condition.name 207 else: 208 txt += " by peer" 209 self.condition = None 210 super(ConnectionClosed, self).__init__(txt)
211
212 213 -class BlockingConnection(Handler):
214 """ 215 A synchronous style connection wrapper. 216 217 This object's implementation uses OS resources. To ensure they 218 are released when the object is no longer in use, make sure that 219 object operations are enclosed in a try block and that close() is 220 always executed on exit. 221 """ 222
223 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs):
224 self.disconnected = False 225 self.timeout = timeout or 60 226 self.container = container or Container() 227 self.container.timeout = self.timeout 228 self.container.start() 229 self.url = Url(url).defaults() 230 self.conn = None 231 self.closing = False 232 failed = True 233 try: 234 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, 235 heartbeat=heartbeat, **kwargs) 236 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 237 msg="Opening connection") 238 failed = False 239 finally: 240 if failed and self.conn: 241 self.close()
242
243 - def create_sender(self, address, handler=None, name=None, options=None):
244 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, 245 options=options))
246
247 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
248 prefetch = credit 249 if handler: 250 fetcher = None 251 if prefetch is None: 252 prefetch = 1 253 else: 254 fetcher = Fetcher(self, credit) 255 return BlockingReceiver( 256 self, 257 self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, 258 options=options), fetcher, credit=prefetch)
259
260 - def close(self):
261 # TODO: provide stronger interrupt protection on cleanup. See PEP 419 262 if self.closing: 263 return 264 self.closing = True 265 self.container.errors = [] 266 try: 267 if self.conn: 268 self.conn.close() 269 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 270 msg="Closing connection") 271 finally: 272 self.conn.free() 273 # Nothing left to block on. Allow reactor to clean up. 274 self.run() 275 self.conn = None 276 self.container.global_handler = None # break circular ref: container to cadapter.on_error 277 pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive 278 self.container = None
279
280 - def _is_closed(self):
281 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
282
283 - def run(self):
284 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 285 while self.container.process(): pass 286 self.container.stop() 287 self.container.process()
288
289 - def wait(self, condition, timeout=False, msg=None):
290 """Call process until condition() is true""" 291 if timeout is False: 292 timeout = self.timeout 293 if timeout is None: 294 while not condition() and not self.disconnected: 295 self.container.process() 296 else: 297 container_timeout = self.container.timeout 298 self.container.timeout = timeout 299 try: 300 deadline = time.time() + timeout 301 while not condition() and not self.disconnected: 302 self.container.process() 303 if deadline < time.time(): 304 txt = "Connection %s timed out" % self.url 305 if msg: txt += ": " + msg 306 raise Timeout(txt) 307 finally: 308 self.container.timeout = container_timeout 309 if self.disconnected or self._is_closed(): 310 self.container.stop() 311 self.conn.handler = None # break cyclical reference 312 if self.disconnected and not self._is_closed(): 313 raise ConnectionException( 314 "Connection %s disconnected: %s" % (self.url, self.disconnected))
315 321
322 - def on_connection_remote_close(self, event):
323 if event.connection.state & Endpoint.LOCAL_ACTIVE: 324 event.connection.close() 325 if not self.closing: 326 raise ConnectionClosed(event.connection)
327
328 - def on_transport_tail_closed(self, event):
329 self.on_transport_closed(event)
330
331 - def on_transport_head_closed(self, event):
332 self.on_transport_closed(event)
333
334 - def on_transport_closed(self, event):
335 self.disconnected = event.transport.condition or "unknown"
336
337 338 -class AtomicCount(object):
339 - def __init__(self, start=0, step=1):
340 """Thread-safe atomic counter. Start at start, increment by step.""" 341 self.count, self.step = start, step 342 self.lock = threading.Lock()
343
344 - def next(self):
345 """Get the next value""" 346 self.lock.acquire() 347 self.count += self.step; 348 result = self.count 349 self.lock.release() 350 return result
351
352 353 -class SyncRequestResponse(IncomingMessageHandler):
354 """ 355 Implementation of the synchronous request-response (aka RPC) pattern. 356 @ivar address: Address for all requests, may be None. 357 @ivar connection: Connection for requests and responses. 358 """ 359 360 correlation_id = AtomicCount() 361
362 - def __init__(self, connection, address=None):
363 """ 364 Send requests and receive responses. A single instance can send many requests 365 to the same or different addresses. 366 367 @param connection: A L{BlockingConnection} 368 @param address: Address for all requests. 369 If not specified, each request must have the address property set. 370 Successive messages may have different addresses. 371 """ 372 super(SyncRequestResponse, self).__init__() 373 self.connection = connection 374 self.address = address 375 self.sender = self.connection.create_sender(self.address) 376 # dynamic=true generates a unique address dynamically for this receiver. 377 # credit=1 because we want to receive 1 response message initially. 378 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 379 self.response = None
380
381 - def call(self, request):
382 """ 383 Send a request message, wait for and return the response message. 384 385 @param request: A L{proton.Message}. If L{self.address} is not set the 386 L{self.address} must be set and will be used. 387 """ 388 if not self.address and not request.address: 389 raise ValueError("Request message has no address: %s" % request) 390 request.reply_to = self.reply_to 391 request.correlation_id = correlation_id = str(self.correlation_id.next()) 392 self.sender.send(request) 393 394 def wakeup(): 395 return self.response and (self.response.correlation_id == correlation_id)
396 397 self.connection.wait(wakeup, msg="Waiting for response") 398 response = self.response 399 self.response = None # Ready for next response. 400 self.receiver.flow(1) # Set up credit for the next response. 401 return response
402 403 @property
404 - def reply_to(self):
405 """Return the dynamic address of our receiver.""" 406 return self.receiver.remote_source.address
407
408 - def on_message(self, event):
409 """Called when we receive a message for our receiver.""" 410 self.response = event.message 411 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
412