1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Duplicity specific but otherwise generic threading interfaces and
24 utilities.
25
26 (Not called "threading" because we do not want to conflict with
27 the standard threading module, and absolute imports require
28 at least python 2.5.)
29 """
30
31 _threading_supported = True
32
33 try:
34 import thread
35 except ImportError:
36 import dummy_thread as thread
37 _threading_supported = False
38
39 try:
40 import threading
41 except ImportError:
42 import dummy_threading as threading
43 _threading_supported = False
44
45 import sys
46
47 from duplicity import errors
48
50 """
51 Returns whether threading is supported on the system we are
52 running on.
53 """
54 return _threading_supported
55
57 """
58 Assert that threading is required for operation to continue. Raise
59 an appropriate exception if this is not the case.
60
61 Reason specifies an optional reason why threading is required,
62 which will be used for error reporting in case threading is not
63 supported.
64 """
65 if not threading_supported():
66 if reason is None:
67 reason = "(no reason given)"
68 raise errors.NotSupported("threading was needed because [%s], but "\
69 "is not supported by the python "\
70 "interpreter" % (reason,))
71
73 """
74 Returns the thread module, or dummy_thread if threading is not
75 supported.
76 """
77 return thread
78
80 """
81 Returns the threading module, or dummy_thread if threading is not
82 supported.
83 """
84 return threading
85
87 """
88 Call fn with lock acquired. Guarantee that lock is released upon
89 the return of fn.
90
91 Returns the value returned by fn, or raises the exception raised
92 by fn.
93
94 (Lock can actually be anything responding to acquire() and
95 release().)
96 """
97 lock.acquire()
98
99 try:
100 return fn()
101 finally:
102 lock.release()
103
105 """
106 cv - The threading.Condition instance to wait on
107 test - Callable returning a boolean to indicate whether
108 the criteria being waited on has been satisfied.
109
110 Perform a wait on a condition such that it is keyboard
111 interruptable when done in the main thread. Due to Python
112 limitations as of <= 2.5, lock acquisition and conditions waits
113 are not interruptable when performed in the main thread.
114
115 Currently, this comes at a cost additional CPU use, compared to a
116 normal wait. Future implementations may be more efficient if the
117 underlying python supports it.
118
119 The condition must be acquired.
120
121 This function should only be used on conditions that are never
122 expected to be acquired for extended periods of time, or the
123 lock-acquire of the underlying condition could cause an
124 uninterruptable state despite the efforts of this function.
125
126 There is no equivalent for acquireing a lock, as that cannot be
127 done efficiently.
128
129 Example:
130
131 Instead of:
132
133 cv.acquire()
134 while not thing_done:
135 cv.wait(someTimeout)
136 cv.release()
137
138 do:
139
140 cv.acquire()
141 interruptable_condwait(cv, lambda: thing_done)
142 cv.release()
143
144 """
145
146
147
148
149
150
151
152 while not waitFor():
153 cv.wait(0.1)
154
156 """
157 Splits the act of calling the given function into one front-end
158 part for waiting on the result, and a back-end part for performing
159 the work in another thread.
160
161 Returns (waiter, caller) where waiter is a function to be called
162 in order to wait for the results of an asynchronous invokation of
163 fn to complete, returning fn's result or propagating it's
164 exception.
165
166 Caller is the function to call in a background thread in order to
167 execute fn asynchronously. Caller will return (success, waiter)
168 where success is a boolean indicating whether the function
169 suceeded (did NOT raise an exception), and waiter is the waiter
170 that was originally returned by the call to async_split().
171 """
172
173
174
175
176
177
178
179
180
181 cv = threading.Condition()
182 state = { 'done': False,
183 'error': None,
184 'trace': None,
185 'value': None }
186
187 def waiter():
188 cv.acquire()
189 try:
190 interruptably_wait(cv, lambda: state['done'])
191
192 if state['error'] is None:
193 return state['value']
194 else:
195 raise state['error'], None, state['trace']
196 finally:
197 cv.release()
198
199 def caller():
200 try:
201 value = fn()
202
203 cv.acquire()
204 state['done'] = True
205 state['value'] = value
206 cv.notify()
207 cv.release()
208
209 return (True, waiter)
210 except Exception, e:
211 cv.acquire()
212 state['done'] = True
213 state['error'] = e
214 state['trace'] = sys.exc_info()[2]
215 cv.notify()
216 cv.release()
217
218 return (False, waiter)
219
220 return (waiter, caller)
221
223 """
224 A thread-safe container of a reference to an object (but not the
225 object itself).
226
227 In particular this means it is safe to:
228
229 value.set(1)
230
231 But unsafe to:
232
233 value.get()['key'] = value
234
235 Where the latter must be done using something like:
236
237 def _setprop():
238 value.get()['key'] = value
239
240 with_lock(value, _setprop)
241
242 Operations such as increments are best done as:
243
244 value.transform(lambda val: val + 1)
245 """
246
248 """
249 Initialuze with the given value.
250 """
251 self.__value = value
252
253 self.__cv = threading.Condition()
254
256 """
257 Returns the value protected by this Value.
258 """
259 return with_lock(self.__cv, lambda: self.__value)
260
261 - def set(self, value):
262 """
263 Resets the value protected by this Value.
264 """
265 def _set():
266 self.__value = value
267
268 with_lock(self.__cv, _set)
269
286
287 return with_lock(self.cv, _transform)
288
290 """
291 Acquire this Value for mutually exclusive access. Only ever
292 needed when calling code must perform operations that cannot
293 be done with get(), set() or transform().
294 """
295 self.__cv.acquire()
296
298 """
299 Release this Value for mutually exclusive access.
300 """
301 self.__cv.release()
302