Profile image for Ian Lewis IanLewis
An updated version of the Mapper class written here: http://code.google.com/appengine...

Basically the indentation was incorrect for the check to see if it should run _batch_write(), _batch_write() wasn't called for the final page, and google.appengine.ext.db wasn't being imported.
Language
Python
Tags
appengine datastore deferred mapper task queue

Appengine Mapper class

1 from google.appengine.ext import db 2 3 from google.appengine.ext import deferred 4 from google.appengine.runtime import DeadlineExceededError 5 6 class Mapper(object): 7 # Subclasses should replace this with a model class (eg, model.Person). 8 KIND = None 9 10 # Subclasses can replace this with a list of (property, value) tuples to filter by. 11 FILTERS = [] 12 13 def __init__(self): 14 self.to_put = [] 15 self.to_delete = [] 16 17 def map(self, entity): 18 """Updates a single entity. 19 20 Implementers should return a tuple containing two iterables (to_update, to_delete). 21 """ 22 return ([], []) 23 24 def finish(self): 25 """Called when the mapper has finished, to allow for any final work to be done.""" 26 self._batch_write() 27 28 def get_query(self): 29 """Returns a query over the specified kind, with any appropriate filters applied.""" 30 q = self.KIND.all() 31 for prop, value in self.FILTERS: 32 q.filter("%s =" % prop, value) 33 q.order("__key__") 34 return q 35 36 def run(self, batch_size=100): 37 """Starts the mapper running.""" 38 self._continue(None, batch_size) 39 40 def _batch_write(self): 41 """Writes updates and deletes entities in a batch.""" 42 if self.to_put: 43 db.put(self.to_put) 44 self.to_put = [] 45 if self.to_delete: 46 db.delete(self.to_delete) 47 self.to_delete = [] 48 49 def _continue(self, start_key, batch_size): 50 q = self.get_query() 51 # If we're resuming, pick up where we left off last time. 52 if start_key: 53 q.filter("__key__ >", start_key) 54 # Keep updating records until we run out of time. 55 try: 56 # Steps over the results, returning each entity and its index. 57 for i, entity in enumerate(q): 58 map_updates, map_deletes = self.map(entity) 59 self.to_put.extend(map_updates) 60 self.to_delete.extend(map_deletes) 61 # Do updates and deletes in batches. 62 if (i + 1) % batch_size == 0: 63 self._batch_write() 64 # Record the last entity we processed. 65 start_key = entity.key() 66 except DeadlineExceededError: 67 # Write any unfinished updates to the datastore. 68 self._batch_write() 69 # Queue a new task to pick up where we left off. 70 deferred.defer(self._continue, start_key, batch_size) 71 return 72 self.finish()

Comments