Package duplicity :: Module lazy
[hide private]
[frames] | no frames]

Source Code for Module duplicity.lazy

  1  # -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- 
  2  # 
  3  # Copyright 2002 Ben Escoto <ben@emerose.org> 
  4  # Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 
  5  # 
  6  # This file is part of duplicity. 
  7  # 
  8  # Duplicity is free software; you can redistribute it and/or modify it 
  9  # under the terms of the GNU General Public License as published by the 
 10  # Free Software Foundation; either version 2 of the License, or (at your 
 11  # option) any later version. 
 12  # 
 13  # Duplicity is distributed in the hope that it will be useful, but 
 14  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 16  # General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with duplicity; if not, write to the Free Software Foundation, 
 20  # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 21   
 22  """Define some lazy data structures and functions acting on them""" 
 23   
 24  import os 
 25   
 26  from duplicity.static import * #@UnusedWildImport 
 27   
 28   
29 -class Iter:
30 """Hold static methods for the manipulation of lazy iterators""" 31
32 - def filter(predicate, iterator): #@NoSelf
33 """Like filter in a lazy functional programming language""" 34 for i in iterator: 35 if predicate(i): 36 yield i
37
38 - def map(function, iterator): #@NoSelf
39 """Like map in a lazy functional programming language""" 40 for i in iterator: 41 yield function(i) 42
43 - def foreach(function, iterator): #@NoSelf
44 """Run function on each element in iterator""" 45 for i in iterator: 46 function(i) 47
48 - def cat(*iters): #@NoSelf
49 """Lazily concatenate iterators""" 50 for iter in iters: 51 for i in iter: 52 yield i 53
54 - def cat2(iter_of_iters): #@NoSelf
55 """Lazily concatenate iterators, iterated by big iterator""" 56 for iter in iter_of_iters: 57 for i in iter: 58 yield i 59
60 - def empty(iter): #@NoSelf
61 """True if iterator has length 0""" 62 for i in iter: #@UnusedVariable 63 return None 64 return 1 65
66 - def equal(iter1, iter2, verbose = None, operator = lambda x, y: x == y): #@NoSelf
67 """True if iterator 1 has same elements as iterator 2 68 69 Use equality operator, or == if it is unspecified. 70 71 """ 72 for i1 in iter1: 73 try: 74 i2 = iter2.next() 75 except StopIteration: 76 if verbose: 77 print "End when i1 = %s" % (i1,) 78 return None 79 if not operator(i1, i2): 80 if verbose: 81 print "%s not equal to %s" % (i1, i2) 82 return None 83 try: 84 i2 = iter2.next() 85 except StopIteration: 86 return 1 87 if verbose: 88 print "End when i2 = %s" % (i2,) 89 return None 90
91 - def Or(iter): #@NoSelf
92 """True if any element in iterator is true. Short circuiting""" 93 i = None 94 for i in iter: 95 if i: 96 return i 97 return i 98
99 - def And(iter): #@NoSelf
100 """True if all elements in iterator are true. Short circuiting""" 101 i = 1 102 for i in iter: 103 if not i: 104 return i 105 return i 106
107 - def len(iter): #@NoSelf
108 """Return length of iterator""" 109 i = 0 110 while 1: 111 try: 112 iter.next() 113 except StopIteration: 114 return i 115 i = i+1 116
117 - def foldr(f, default, iter): #@NoSelf
118 """foldr the "fundamental list recursion operator"?""" 119 try: 120 next = iter.next() 121 except StopIteration: 122 return default 123 return f(next, Iter.foldr(f, default, iter)) 124
125 - def foldl(f, default, iter): #@NoSelf
126 """the fundamental list iteration operator..""" 127 while 1: 128 try: 129 next = iter.next() 130 except StopIteration: 131 return default 132 default = f(default, next) 133
134 - def multiplex(iter, num_of_forks, final_func = None, closing_func = None): #@NoSelf
135 """Split a single iterater into a number of streams 136 137 The return val will be a list with length num_of_forks, each 138 of which will be an iterator like iter. final_func is the 139 function that will be called on each element in iter just as 140 it is being removed from the buffer. closing_func is called 141 when all the streams are finished. 142 143 """ 144 if num_of_forks == 2 and not final_func and not closing_func: 145 im2 = IterMultiplex2(iter) 146 return (im2.yielda(), im2.yieldb()) 147 if not final_func: 148 final_func = lambda i: None 149 if not closing_func: 150 closing_func = lambda: None 151 152 # buffer is a list of elements that some iterators need and others 153 # don't 154 buffer = [] 155 156 # buffer[forkposition[i]] is the next element yieled by iterator 157 # i. If it is -1, yield from the original iter 158 starting_forkposition = [-1] * num_of_forks 159 forkposition = starting_forkposition[:] 160 called_closing_func = [None] 161 162 def get_next(fork_num): 163 """Return the next element requested by fork_num""" 164 if forkposition[fork_num] == -1: 165 try: 166 buffer.insert(0, iter.next()) 167 except StopIteration: 168 # call closing_func if necessary 169 if (forkposition == starting_forkposition and 170 not called_closing_func[0]): 171 closing_func() 172 called_closing_func[0] = None 173 raise StopIteration 174 for i in range(num_of_forks): 175 forkposition[i] += 1 176 177 return_val = buffer[forkposition[fork_num]] 178 forkposition[fork_num] -= 1 179 180 blen = len(buffer) 181 if not (blen-1) in forkposition: 182 # Last position in buffer no longer needed 183 assert forkposition[fork_num] == blen-2 184 final_func(buffer[blen-1]) 185 del buffer[blen-1] 186 return return_val 187 188 def make_iterator(fork_num): 189 while(1): 190 yield get_next(fork_num) 191 192 return tuple(map(make_iterator, range(num_of_forks))) 193 194 MakeStatic(Iter) 195 196
197 -class IterMultiplex2:
198 """Multiplex an iterator into 2 parts 199 200 This is a special optimized case of the Iter.multiplex function, 201 used when there is no closing_func or final_func, and we only want 202 to split it into 2. By profiling, this is a time sensitive class. 203 204 """
205 - def __init__(self, iter):
206 self.a_leading_by = 0 # How many places a is ahead of b 207 self.buffer = [] 208 self.iter = iter
209
210 - def yielda(self):
211 """Return first iterator""" 212 buf, iter = self.buffer, self.iter 213 while(1): 214 if self.a_leading_by >= 0: 215 # a is in front, add new element 216 elem = iter.next() # exception will be passed 217 buf.append(elem) 218 else: 219 # b is in front, subtract an element 220 elem = buf.pop(0) 221 self.a_leading_by += 1 222 yield elem
223
224 - def yieldb(self):
225 """Return second iterator""" 226 buf, iter = self.buffer, self.iter 227 while(1): 228 if self.a_leading_by <= 0: 229 # b is in front, add new element 230 elem = iter.next() # exception will be passed 231 buf.append(elem) 232 else: 233 # a is in front, subtract an element 234 elem = buf.pop(0) 235 self.a_leading_by -= 1 236 yield elem
237 238
239 -class IterTreeReducer:
240 """Tree style reducer object for iterator - stolen from rdiff-backup 241 242 The indicies of a RORPIter form a tree type structure. This class 243 can be used on each element of an iter in sequence and the result 244 will be as if the corresponding tree was reduced. This tries to 245 bridge the gap between the tree nature of directories, and the 246 iterator nature of the connection between hosts and the temporal 247 order in which the files are processed. 248 249 This will usually be used by subclassing ITRBranch below and then 250 call the initializer below with the new class. 251 252 """
253 - def __init__(self, branch_class, branch_args):
254 """ITR initializer""" 255 self.branch_class = branch_class 256 self.branch_args = branch_args 257 self.index = None 258 self.root_branch = branch_class(*branch_args) 259 self.branches = [self.root_branch]
260
261 - def finish_branches(self, index):
262 """Run Finish() on all branches index has passed 263 264 When we pass out of a branch, delete it and process it with 265 the parent. The innermost branches will be the last in the 266 list. Return None if we are out of the entire tree, and 1 267 otherwise. 268 269 """ 270 branches = self.branches 271 while 1: 272 to_be_finished = branches[-1] 273 base_index = to_be_finished.base_index 274 if base_index != index[:len(base_index)]: 275 # out of the tree, finish with to_be_finished 276 to_be_finished.call_end_proc() 277 del branches[-1] 278 if not branches: 279 return None 280 branches[-1].branch_process(to_be_finished) 281 else: 282 return 1
283
284 - def add_branch(self):
285 """Return branch of type self.branch_class, add to branch list""" 286 branch = self.branch_class(*self.branch_args) 287 self.branches.append(branch) 288 return branch
289
290 - def process_w_branch(self, index, branch, args):
291 """Run start_process on latest branch""" 292 robust.check_common_error(branch.on_error, 293 branch.start_process, args) 294 if not branch.caught_exception: 295 branch.start_successful = 1 296 branch.base_index = index
297
298 - def Finish(self):
299 """Call at end of sequence to tie everything up""" 300 while 1: 301 to_be_finished = self.branches.pop() 302 to_be_finished.call_end_proc() 303 if not self.branches: 304 break 305 self.branches[-1].branch_process(to_be_finished)
306
307 - def __call__(self, *args):
308 """Process args, where args[0] is current position in iterator 309 310 Returns true if args successfully processed, false if index is 311 not in the current tree and thus the final result is 312 available. 313 314 Also note below we set self.index after doing the necessary 315 start processing, in case there is a crash in the middle. 316 317 """ 318 index = args[0] 319 if self.index is None: 320 self.process_w_branch(index, self.root_branch, args) 321 self.index = index 322 return 1 323 324 if index <= self.index: 325 log.Warn(_("Warning: oldindex %s >= newindex %s") % 326 (self.index, index)) 327 return 1 328 329 if self.finish_branches(index) is None: 330 return None # We are no longer in the main tree 331 last_branch = self.branches[-1] 332 if last_branch.start_successful: 333 if last_branch.can_fast_process(*args): 334 robust.check_common_error(last_branch.on_error, 335 last_branch.fast_process, args) 336 else: 337 branch = self.add_branch() 338 self.process_w_branch(index, branch, args) 339 else: 340 last_branch.log_prev_error(index) 341 342 self.index = index 343 return 1
344 345
346 -class ITRBranch:
347 """Helper class for IterTreeReducer above 348 349 There are five stub functions below: start_process, end_process, 350 branch_process, fast_process, and can_fast_process. A class that 351 subclasses this one will probably fill in these functions to do 352 more. 353 354 """ 355 base_index = index = None 356 finished = None 357 caught_exception = start_successful = None 358
359 - def call_end_proc(self):
360 """Runs the end_process on self, checking for errors""" 361 if self.finished or not self.start_successful: 362 self.caught_exception = 1 363 364 # Since all end_process does is copy over attributes, might as 365 # well run it even if we did get errors earlier. 366 robust.check_common_error(self.on_error, self.end_process) 367 368 self.finished = 1
369
370 - def start_process(self, *args):
371 """Do some initial processing (stub)""" 372 pass
373
374 - def end_process(self):
375 """Do any final processing before leaving branch (stub)""" 376 pass
377
378 - def branch_process(self, branch):
379 """Process a branch right after it is finished (stub)""" 380 assert branch.finished 381 pass
382
383 - def can_fast_process(self, *args):
384 """True if object can be processed without new branch (stub)""" 385 return None
386
387 - def fast_process(self, *args):
388 """Process args without new child branch (stub)""" 389 pass
390
391 - def on_error(self, exc, *args):
392 """This is run on any exception in start/end-process""" 393 self.caught_exception = 1 394 if args and args[0] and isinstance(args[0], tuple): 395 filename = os.path.join(*args[0]) 396 elif self.index: 397 filename = os.path.join(*self.index) 398 else: 399 filename = "." 400 log.Warn(_("Error '%s' processing %s") % (exc, filename))
401
402 - def log_prev_error(self, index):
403 """Call function if no pending exception""" 404 if not index: 405 index_str = "." 406 else: 407 index_str = os.path.join(*index) 408 log.Warn(_("Skipping %s because of previous error") % index_str)
409 410 411 from duplicity import log 412 from duplicity import robust 413