- An updated version of the Mapper class written here: http://code.google.com/appengine...
Basically the indentation was incorrect for the check to see if it should run _batch_write(), _batch_write() wasn't called for the final page, and google.appengine.ext.db wasn't being imported.
Appengine Mapper class
1 from google.appengine.ext import db
2
3 from google.appengine.ext import deferred
4 from google.appengine.runtime import DeadlineExceededError
5
6 class Mapper(object):
7 # Subclasses should replace this with a model class (eg, model.Person).
8 KIND = None
9
10 # Subclasses can replace this with a list of (property, value) tuples to filter by.
11 FILTERS = []
12
13 def __init__(self):
14 self.to_put = []
15 self.to_delete = []
16
17 def map(self, entity):
18 """Updates a single entity.
19
20 Implementers should return a tuple containing two iterables (to_update, to_delete).
21 """
22 return ([], [])
23
24 def finish(self):
25 """Called when the mapper has finished, to allow for any final work to be done."""
26 self._batch_write()
27
28 def get_query(self):
29 """Returns a query over the specified kind, with any appropriate filters applied."""
30 q = self.KIND.all()
31 for prop, value in self.FILTERS:
32 q.filter("%s =" % prop, value)
33 q.order("__key__")
34 return q
35
36 def run(self, batch_size=100):
37 """Starts the mapper running."""
38 self._continue(None, batch_size)
39
40 def _batch_write(self):
41 """Writes updates and deletes entities in a batch."""
42 if self.to_put:
43 db.put(self.to_put)
44 self.to_put = []
45 if self.to_delete:
46 db.delete(self.to_delete)
47 self.to_delete = []
48
49 def _continue(self, start_key, batch_size):
50 q = self.get_query()
51 # If we're resuming, pick up where we left off last time.
52 if start_key:
53 q.filter("__key__ >", start_key)
54 # Keep updating records until we run out of time.
55 try:
56 # Steps over the results, returning each entity and its index.
57 for i, entity in enumerate(q):
58 map_updates, map_deletes = self.map(entity)
59 self.to_put.extend(map_updates)
60 self.to_delete.extend(map_deletes)
61 # Do updates and deletes in batches.
62 if (i + 1) % batch_size == 0:
63 self._batch_write()
64 # Record the last entity we processed.
65 start_key = entity.key()
66 except DeadlineExceededError:
67 # Write any unfinished updates to the datastore.
68 self._batch_write()
69 # Queue a new task to pick up where we left off.
70 deferred.defer(self._continue, start_key, batch_size)
71 return
72 self.finish()
Comments
Sign in to leave a comment.

