Package duplicity :: Module dup_threading
[hide private]
[frames] | no frames]

Source Code for Module duplicity.dup_threading

  1  # -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- 
  2  # 
  3  # Copyright 2002 Ben Escoto <ben@emerose.org> 
  4  # Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 
  5  # 
  6  # This file is part of duplicity. 
  7  # 
  8  # Duplicity is free software; you can redistribute it and/or modify it 
  9  # under the terms of the GNU General Public License as published by the 
 10  # Free Software Foundation; either version 2 of the License, or (at your 
 11  # option) any later version. 
 12  # 
 13  # Duplicity is distributed in the hope that it will be useful, but 
 14  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 16  # General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with duplicity; if not, write to the Free Software Foundation, 
 20  # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 21   
 22  """ 
 23  Duplicity specific but otherwise generic threading interfaces and 
 24  utilities. 
 25   
 26  (Not called "threading" because we do not want to conflict with 
 27  the standard threading module, and absolute imports require 
 28  at least python 2.5.) 
 29  """ 
 30   
 31  _threading_supported = True 
 32   
 33  try: 
 34      import thread 
 35  except ImportError: 
 36      import dummy_thread as thread 
 37      _threading_supported = False 
 38   
 39  try: 
 40      import threading 
 41  except ImportError: 
 42      import dummy_threading as threading 
 43      _threading_supported = False 
 44   
 45  import sys 
 46   
 47  from duplicity import errors 
 48   
49 -def threading_supported():
50 """ 51 Returns whether threading is supported on the system we are 52 running on. 53 """ 54 return _threading_supported
55
56 -def require_threading(reason = None):
57 """ 58 Assert that threading is required for operation to continue. Raise 59 an appropriate exception if this is not the case. 60 61 Reason specifies an optional reason why threading is required, 62 which will be used for error reporting in case threading is not 63 supported. 64 """ 65 if not threading_supported(): 66 if reason is None: 67 reason = "(no reason given)" 68 raise errors.NotSupported("threading was needed because [%s], but "\ 69 "is not supported by the python "\ 70 "interpreter" % (reason,))
71
72 -def thread_module():
73 """ 74 Returns the thread module, or dummy_thread if threading is not 75 supported. 76 """ 77 return thread
78
79 -def threading_module():
80 """ 81 Returns the threading module, or dummy_thread if threading is not 82 supported. 83 """ 84 return threading
85
86 -def with_lock(lock, fn):
87 """ 88 Call fn with lock acquired. Guarantee that lock is released upon 89 the return of fn. 90 91 Returns the value returned by fn, or raises the exception raised 92 by fn. 93 94 (Lock can actually be anything responding to acquire() and 95 release().) 96 """ 97 lock.acquire() 98 99 try: 100 return fn() 101 finally: 102 lock.release()
103
104 -def interruptably_wait(cv, waitFor):
105 """ 106 cv - The threading.Condition instance to wait on 107 test - Callable returning a boolean to indicate whether 108 the criteria being waited on has been satisfied. 109 110 Perform a wait on a condition such that it is keyboard 111 interruptable when done in the main thread. Due to Python 112 limitations as of <= 2.5, lock acquisition and conditions waits 113 are not interruptable when performed in the main thread. 114 115 Currently, this comes at a cost additional CPU use, compared to a 116 normal wait. Future implementations may be more efficient if the 117 underlying python supports it. 118 119 The condition must be acquired. 120 121 This function should only be used on conditions that are never 122 expected to be acquired for extended periods of time, or the 123 lock-acquire of the underlying condition could cause an 124 uninterruptable state despite the efforts of this function. 125 126 There is no equivalent for acquireing a lock, as that cannot be 127 done efficiently. 128 129 Example: 130 131 Instead of: 132 133 cv.acquire() 134 while not thing_done: 135 cv.wait(someTimeout) 136 cv.release() 137 138 do: 139 140 cv.acquire() 141 interruptable_condwait(cv, lambda: thing_done) 142 cv.release() 143 144 """ 145 # We can either poll at some interval, or wait with a short enough 146 # timeout to be practical (i.e., such that it interactively seems 147 # to response semi-immediately to an interrupt). 148 # 149 # Both approaches waste CPU, but the latter approach does not 150 # imply a latency penalty in the common case of a 151 # notify. 152 while not waitFor(): 153 cv.wait(0.1)
154
155 -def async_split(fn):
156 """ 157 Splits the act of calling the given function into one front-end 158 part for waiting on the result, and a back-end part for performing 159 the work in another thread. 160 161 Returns (waiter, caller) where waiter is a function to be called 162 in order to wait for the results of an asynchronous invokation of 163 fn to complete, returning fn's result or propagating it's 164 exception. 165 166 Caller is the function to call in a background thread in order to 167 execute fn asynchronously. Caller will return (success, waiter) 168 where success is a boolean indicating whether the function 169 suceeded (did NOT raise an exception), and waiter is the waiter 170 that was originally returned by the call to async_split(). 171 """ 172 # Implementation notes: 173 # 174 # We use a dictionary to track the state of the asynchronous call, 175 # rather than local variables. This is to get around the way 176 # closures work with respect to local variables in Python. We do 177 # not care about hash lookup overhead since this is intended to be 178 # used for significant amounts of work. 179 180 181 cv = threading.Condition() #@UndefinedVariable 182 state = { 'done': False, 183 'error': None, 184 'trace': None, 185 'value': None } 186 187 def waiter(): 188 cv.acquire() 189 try: 190 interruptably_wait(cv, lambda: state['done']) 191 192 if state['error'] is None: 193 return state['value'] 194 else: 195 raise state['error'], None, state['trace'] 196 finally: 197 cv.release()
198 199 def caller(): 200 try: 201 value = fn() 202 203 cv.acquire() 204 state['done'] = True 205 state['value'] = value 206 cv.notify() 207 cv.release() 208 209 return (True, waiter) 210 except Exception, e: 211 cv.acquire() 212 state['done'] = True 213 state['error'] = e 214 state['trace'] = sys.exc_info()[2] 215 cv.notify() 216 cv.release() 217 218 return (False, waiter) 219 220 return (waiter, caller) 221
222 -class Value:
223 """ 224 A thread-safe container of a reference to an object (but not the 225 object itself). 226 227 In particular this means it is safe to: 228 229 value.set(1) 230 231 But unsafe to: 232 233 value.get()['key'] = value 234 235 Where the latter must be done using something like: 236 237 def _setprop(): 238 value.get()['key'] = value 239 240 with_lock(value, _setprop) 241 242 Operations such as increments are best done as: 243 244 value.transform(lambda val: val + 1) 245 """ 246
247 - def __init__(self, value = None):
248 """ 249 Initialuze with the given value. 250 """ 251 self.__value = value 252 253 self.__cv = threading.Condition() #@UndefinedVariable
254
255 - def get(self):
256 """ 257 Returns the value protected by this Value. 258 """ 259 return with_lock(self.__cv, lambda: self.__value)
260
261 - def set(self, value):
262 """ 263 Resets the value protected by this Value. 264 """ 265 def _set(): 266 self.__value = value
267 268 with_lock(self.__cv, _set)
269
270 - def transform(self, fn):
271 """ 272 Call fn with the current value as the parameter, and reset the 273 value to the return value of fn. 274 275 During the execution of fn, all other access to this Value is 276 prevented. 277 278 If fn raised an exception, the value is not reset. 279 280 Returns the value returned by fn, or raises the exception 281 raised by fn. 282 """ 283 def _transform(): 284 self.__value = fn(self.__value) 285 return self.__value
286 287 return with_lock(self.cv, _transform) 288
289 - def acquire(self):
290 """ 291 Acquire this Value for mutually exclusive access. Only ever 292 needed when calling code must perform operations that cannot 293 be done with get(), set() or transform(). 294 """ 295 self.__cv.acquire()
296
297 - def release(self):
298 """ 299 Release this Value for mutually exclusive access. 300 """ 301 self.__cv.release()
302