Package recsql :: Module sqlarray
[hide private]
[frames] | no frames]

Source Code for Module recsql.sqlarray

  1  # $Id: sqlarray.py 3345 2009-04-17 23:12:37Z oliver $ 
  2  # Copyright (C) 2009 Oliver Beckstein <orbeckst@gmail.com> 
  3  # Released under the GNU Public License, version 3 or higher (your choice) 
  4   
  5  """ 
  6  :mod:`sqlarray` --- Implementation of :class:`SQLarray` 
  7  ======================================================= 
  8   
  9  :class:`SQLarray` is a thin wrapper around pysqlite SQL tables. The main 
 10  features ares that ``SELECT`` queries can return ``numpy.recarrays`` and the 
 11  :meth:`SQLarray.selection` method returns a new :class:`SQLarray` instance.   
 12   
 13  numpy arrays can be stored in sql fields which allows advanced table 
 14  aggregate functions such as ``histogram``. 
 15   
 16  A number of additional SQL functions are defined. 
 17   
 18  :TODO: 
 19     * Make object saveable (i.e. store the database on disk instead of 
 20       memory or dump the memory db and provide a load() method 
 21     * Use hooks for the pickling protocol to make this transparent.  
 22   
 23  .. SeeAlso:: PyTables_ is a high-performance interface to table data.  
 24   
 25  .. _PyTables: http://www.pytables.org 
 26   
 27  Module content 
 28  -------------- 
 29  .. See the autogenerated content in the online docs or the source code. 
 30   
 31  """ 
 32   
 33  import os.path 
 34  import re 
 35  try: 
 36      from pysqlite2 import dbapi2 as sqlite     # ... all development was with pysqlite2 
 37  except ImportError: 
 38      from sqlite3 import dbapi2 as sqlite       # I hope we are compatible with sqlite3 
 39  import numpy 
 40  from sqlutil import adapt_numpyarray, convert_numpyarray,\ 
 41      adapt_object, convert_object 
 42  from rest_table import Table2array 
 43   
 44  sqlite.register_adapter(numpy.ndarray,adapt_numpyarray) 
 45  sqlite.register_adapter(numpy.recarray,adapt_numpyarray) 
 46  sqlite.register_adapter(numpy.core.records.recarray,adapt_numpyarray) 
 47  sqlite.register_adapter(tuple,adapt_object) 
 48  sqlite.register_adapter(list,adapt_object) 
 49  sqlite.register_converter("NumpyArray", convert_numpyarray) 
 50  sqlite.register_converter("Object", convert_object) 
 51   
 52   
53 -class SQLarray(object):
54 """A SQL table that returns (mostly) rec arrays. 55 56 .. method:: SQLarray([name[,records[,columns[,cachesize=5,connection=None]]]]) 57 58 :Arguments: 59 name 60 table name (can be referred to as '__self__' in SQL queries) 61 records 62 numpy record array that describes the layout and initializes the 63 table OR any iterable (and then columns must be set, too) OR a string 64 that contains a single, *simple reStructured text table* (and the table name is 65 set from the table name in the reST table.) 66 If ``None`` then simply associate with existing table name. 67 filename 68 Alternatively to *records*, read a reStructured table from *filename*. 69 columns 70 sequence of column names (only used if records does not have 71 attribute dtype.names) [``None``] 72 cachesize 73 number of (query, result) pairs that are cached [5] 74 connection 75 If not ``None``, reuse this connection; this adds a new table to the same 76 database, which allows more complicated queries with cross-joins. The 77 table's connection is available as the attribute T.connection. [``None``] 78 is_tmp 79 ``True``: create a tmp table; ``False``: regular table in db [``False``] 80 81 :Bugs: 82 * :exc:`InterfaceError`: *Error binding parameter 0 - probably unsupported type* 83 84 In this case the recarray contained types such as ``numpy.int64`` that are not 85 understood by sqlite. Either convert the data manually (by setting the numpy 86 dtypes yourself on the recarray, or better: feed a simple list of tuples ("records") 87 to this class in *records*. Make sure that these tuples only contain standard python types. 88 Together with *records* you will also have to supply the names of the data columns 89 in the keyword argument *columns*. 90 91 If you are reading from a file then it might be simpler to 92 use :func:`recsql.sqlarray.SQLarray_fromfile`. 93 """ 94 95 tmp_table_name = '__tmp_merge_table' # reserved name (see merge()) 96
97 - def __init__(self,name=None ,records=None, filename=None, columns=None, 98 cachesize=5, connection=None, is_tmp=False, **kwargs):
99 """Build the SQL table from a numpy record array. 100 """ 101 self.name = str(name) 102 if self.name == self.tmp_table_name and not is_tmp: 103 raise ValueError('name = %s is reserved, choose another one' % name) 104 if connection is None: 105 self.connection = sqlite.connect(':memory:', 106 detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES) 107 self._init_sqlite_functions() # add additional functions to database 108 else: 109 self.connection = connection # use existing connection 110 self.cursor = self.connection.cursor() 111 112 if records is None and filename is None: 113 if name is None: 114 raise ValueError("Provide either an existing table name or a source of records.") 115 # associate with existing table 116 # SECURITY risk: interpolating name... 117 SQL = "SELECT * FROM %(name)s WHERE 0" % vars(self) 118 c = self.cursor 119 try: 120 c.execute(SQL) 121 except sqlite.OperationalError,err: 122 if str(err).find('no such table') > -1 or \ 123 str(err).find('syntax error') > -1: 124 raise ValueError("Provide existing legal 'name' of an existing table not %r" 125 % self.name) 126 else: 127 raise 128 self.columns = tuple([x[0] for x in c.description]) 129 self.ncol = len(self.columns) 130 else: # got records 131 # TODO: this should be cleaned up; see also SQLarray_fromfile() 132 if records is None and not filename is None: 133 records = ''.join(open(filename,'r').readlines()) # read file into records 134 135 if type(records) is str: 136 # maybe this is a reST table 137 P = Table2array(records, **kwargs) 138 P.parse() 139 records = P.records # get the records and colnames instead of the numpy.recarray 140 columns = P.names # ... in order to avoid the dreaded 'InterfaceError' 141 self.name = P.tablename # name provided as 'Table[<tablename>]: ...' 142 try: 143 self.columns = records.dtype.names 144 except AttributeError: 145 if columns is None: 146 raise TypeError('records must be a recarray or columns should be supplied') 147 self.columns = columns # XXX: no sanity check 148 self.ncol = len(self.columns) 149 150 # initialize table 151 # * input is NOT sanitized and is NOT safe, don't use as CGI... 152 # * this can overwrite an existing table (name is not checked) 153 if not is_tmp: 154 SQL = "CREATE TABLE "+self.name+" ("+",".join(self.columns)+")" 155 else: 156 # temporary table 157 SQL = "CREATE TEMPORARY TABLE "+self.name+" ("+",".join(self.columns)+")" 158 self.cursor.execute(SQL) 159 SQL = "INSERT INTO "+self.name+" ("+ ",".join(self.columns)+") "\ 160 +"VALUES "+"("+",".join(self.ncol*['?'])+")" 161 try: 162 # The next can fail with 'InterfaceError: Error binding parameter 0 - probably unsupported type.' 163 # This means that the numpy array should be set up so that there are no data types 164 # such as numpy.int64/32(?) which are not compatible with sqlite (no idea why). 165 self.cursor.executemany(SQL,records) 166 except: 167 import sys 168 sys.stderr.write("ERROR: You are probably feeding a recarray; sqlite does not know how to \n" 169 " deal with special numpy types such as int32 or int64. Try using the \n" 170 " recsql.SQLarray_fromfile() function or feed simple records (see docs).") 171 raise 172 173 # initialize query cache 174 self.__cache = KRingbuffer(cachesize)
175
176 - def recarray():
177 doc = """Return underlying SQL table as a read-only record array.""" 178 def fget(self): 179 return self.SELECT('*')
180 return locals()
181 recarray = property(**recarray()) 182
183 - def merge(self,recarray,columns=None):
184 """Merge another recarray with the same columns into this table. 185 186 :Arguments: 187 recarray 188 numpy record array that describes the layout and initializes the 189 table 190 191 :Returns: 192 n number of inserted rows 193 194 :Raises: 195 Raises an exception if duplicate and incompatible data exist 196 in the main table and the new one. 197 """ 198 len_before = len(self) 199 # CREATE TEMP TABLE in database 200 tmparray = SQLarray(self.tmp_table_name, records=recarray, columns=columns, 201 connection=self.connection, is_tmp=True) 202 len_tmp = len(tmparray) 203 # insert into main table 204 SQL = """INSERT OR ABORT INTO __self__ SELECT * FROM %s""" % self.tmp_table_name 205 self.sql(SQL) 206 len_after = len(self) 207 n_inserted = len_after - len_before 208 assert len_tmp == n_inserted 209 del tmparray # also drops the tmp table (keep it at end for debugging) 210 return n_inserted
211
212 - def merge_table(self,name):
213 """Merge an existing table in the database with the __self__ table. 214 215 Executes as ``'INSERT INTO __self__ SELECT * FROM <name>'``. 216 However, this method is probably used less often than the simpler :meth:`merge`. 217 218 :Arguments: 219 name name of the table in the database (must be compatible with __self__) 220 221 :Returns: 222 n number of inserted rows 223 """ 224 l_before = len(self) 225 SQL = """INSERT OR ABORT INTO __self__ SELECT * FROM %s""" % name 226 self.sql(SQL) 227 l_after = len(self) 228 return l_after - l_before
229
230 - def sql_index(self,index_name,column_names,unique=True):
231 """Add a named index on given columns to improve performance.""" 232 if type(column_names) == str: 233 column_names = [column_names] 234 try: 235 if len(column_names) == 0: 236 raise TypeError 237 except TypeError: 238 raise ValueError("Provide a list of column names for an index.") 239 if unique: 240 UNIQUE = "UNIQUE" 241 else: 242 UNIQUE = "" 243 table_name = self.name 244 columns = ",".join(column_names) 245 SQL = """CREATE %(UNIQUE)s INDEX %(index_name)s ON %(table_name)s """\ 246 """(%(columns)s)""" % locals() 247 self.sql(SQL)
248
249 - def sql_select(self,fields,*args,**kwargs):
250 """Execute a simple SQL ``SELECT`` statement and returns values as new numpy rec array. 251 252 The arguments *fields* and the additional optional arguments 253 are simply concatenated with additional SQL statements 254 according to the template:: 255 256 SELECT <fields> FROM __self__ [args] 257 258 The simplest fields argument is ``"*"``. 259 260 Example: 261 Create a recarray in which students with average grade less than 262 3 are listed:: 263 264 result = T.SELECT("surname, subject, year, avg(grade) AS avg_grade", 265 "WHERE avg_grade < 3", "GROUP BY surname,subject", 266 "ORDER BY avg_grade,surname") 267 268 The resulting SQL would be:: 269 270 SELECT surname, subject, year, avg(grade) AS avg_grade FROM __self__ 271 WHERE avg_grade < 3 272 GROUP BY surname,subject 273 ORDER BY avg_grade,surname 274 275 Note how one can use aggregate functions such avg(). 276 277 The string *'__self__'* is automatically replaced with the table 278 name (``T.name``); this can be used for cartesian products such as :: 279 280 LEFT JOIN __self__ WHERE ... 281 282 .. Note:: See the documentation for :meth:`~SQLarray.sql` for more details on 283 the available keyword arguments and the use of ``?`` parameter 284 interpolation. 285 """ 286 SQL = "SELECT "+str(fields)+" FROM __self__ "+ " ".join(args) 287 return self.sql(SQL,**kwargs)
288 289 SELECT = sql_select 290
291 - def sql(self,SQL,parameters=None,asrecarray=True,cache=True):
292 """Execute sql statement. 293 294 :Arguments: 295 SQL : string 296 Full SQL command; can contain the ``?`` place holder so that values 297 supplied with the ``parameters`` keyword can be interpolated using 298 the ``pysqlite`` interface. 299 parameters : tuple 300 Parameters for ``?`` interpolation. 301 asrecarray : boolean 302 ``True``: return a ``numpy.recarray`` if possible; 303 ``False``: return records as a list of tuples. [``True``] 304 cache : boolean 305 Should the results be cached? Set to ``False`` for large queries to 306 avoid memory issues. Queries with ``?`` place holders are never cached. 307 [``True``] 308 309 .. warning:: 310 There are **no sanity checks** applied to the SQL. 311 312 If possible, the returned list of tuples is turned into a 313 numpy record array, otherwise the original list of tuples is 314 returned. 315 316 .. warning:: 317 Potential BUG: if there are memory issues then it can 318 happen that we just silently fall back to a tuple even 319 though calling code expects a recarray; because we 320 swallowed ANY exception the caller will never know 321 322 The last cachesize queries are cached (for cache=True) and are 323 returned directly unless the table has been modified. 324 325 .. Note:: '__self__' is substituted with the table name. See the doc 326 string of the :meth:`SELECT` method for more details. 327 """ 328 SQL = SQL.replace('__self__',self.name) 329 330 # Cache the last N (query,result) tuples using a 'FIFO-dict' 331 # of length N, where key = SQL; if we can use the cache 332 # (cache=True) and if query in dict (AND cache 333 # valid, ie it hasn't been emptied (??)) just return cache result. 334 # 335 # Never use the cache if place holders are used because then we 336 # would return the same result for differing input! 337 if not '?' in SQL and cache and SQL in self.__cache: 338 return self.__cache[SQL] 339 340 c = self.cursor 341 342 if parameters is None: 343 c.execute(SQL) # no sanity checks! 344 else: 345 c.execute(SQL, parameters) # no sanity checks; params should be tuple 346 347 if c.rowcount > 0 or SQL.upper().find('DELETE') > -1: 348 # table was (potentially) modified 349 # rowcount does not change for DELETE, see 350 # http://oss.itsystementwicklung.de/download/pysqlite/doc/sqlite3.html#cursor-objects 351 # so we catch this case manually and invalidate the whole cache 352 self.__cache.clear() 353 result = c.fetchall() 354 if not result: 355 return [] # leaving here keeps cache invalidated 356 if asrecarray: 357 try: 358 names = [x[0] for x in c.description] # first elements are column names 359 result = numpy.rec.fromrecords(result,names=names) 360 except: 361 # XXX: potential BUG: if there are memory issues then it can happen that 362 # XXX: we just silently fall back to a tuple but calling code expects a 363 # XXX: recarray; because we swallowed ANY exception the caller will never know 364 # XXX: ... should probably change this and not have the try ... except in the first place 365 pass # keep as tuples if we cannot convert 366 else: 367 pass # keep as tuples/data structure as requested 368 if cache: 369 self.__cache.append(SQL,result) 370 return result
371
372 - def limits(self,variable):
373 """Return minimum and maximum of variable across all rows of data.""" 374 (vmin,vmax), = self.SELECT('min(%(variable)s), max(%(variable)s)' % vars()) 375 return vmin,vmax
376
377 - def selection(self,SQL,parameters=None,**kwargs):
378 """Return a new SQLarray from a SELECT selection. 379 380 This is a very useful method because it allows one to build complicated 381 selections and essentially new tables from existing data. 382 383 Examples:: 384 385 s = selection('a > 3') 386 s = selection('a > ?', (3,)) 387 s = selection('SELECT * FROM __self__ WHERE a > ? AND b < ?', (3, 10)) 388 """ 389 # TODO: under development 390 # - could use VIEW 391 # - might be a good idea to use cache=False 392 393 import md5 394 # pretty unsafe... I hope the user knows what they are doing 395 # - only read data to first semicolon 396 # - here should be input scrubbing... 397 safe_sql = re.match(r'(?P<SQL>[^;]*)',SQL).group('SQL') 398 399 if re.match(r'\s*SELECT.*FROM',safe_sql,flags=re.IGNORECASE): 400 _sql = safe_sql 401 else: 402 # WHERE clause only 403 _sql = """SELECT * FROM __self__ WHERE """+str(safe_sql) 404 # (note: MUST replace __self__ before md5!) 405 _sql = _sql.replace('__self__', self.name) 406 # unique name for table (unless user supplied... which could be 'x;DROP TABLE...') 407 newname = kwargs.pop('name', 'selection_'+md5.new(_sql).hexdigest()) 408 409 # create table directly 410 # SECURITY: unsafe tablename !!!! (but cannot interpolate?) 411 _sql = "CREATE TABLE %(newname)s AS " % vars() + _sql 412 413 c = self.cursor 414 if parameters is None: 415 c.execute(_sql) # no sanity checks! 416 else: 417 c.execute(_sql, parameters) # no sanity checks; params should be tuple 418 419 # associate with new table in db 420 return SQLarray(newname, None, connection=self.connection)
421
422 - def _init_sqlite_functions(self):
423 """additional SQL functions to the database""" 424 import sqlfunctions 425 426 self.connection.create_function("sqrt", 1,sqlfunctions._sqrt) 427 self.connection.create_function("fformat",2,sqlfunctions._fformat) 428 self.connection.create_aggregate("std",1,sqlfunctions._Stdev) 429 self.connection.create_aggregate("median",1,sqlfunctions._Median) 430 self.connection.create_aggregate("array",1,sqlfunctions._NumpyArray) 431 self.connection.create_aggregate("histogram",4,sqlfunctions._NumpyHistogram) 432 self.connection.create_aggregate("distribution",4,sqlfunctions._NormedNumpyHistogram) 433 self.connection.create_aggregate("meanhistogram",5,sqlfunctions._MeanHistogram) 434 self.connection.create_aggregate("stdhistogram",5,sqlfunctions._StdHistogram) 435 self.connection.create_aggregate("minhistogram",5,sqlfunctions._MinHistogram) 436 self.connection.create_aggregate("maxhistogram",5,sqlfunctions._MaxHistogram) 437 self.connection.create_aggregate("medianhistogram",5,sqlfunctions._MedianHistogram) 438 self.connection.create_aggregate("zscorehistogram",5,sqlfunctions._ZscoreHistogram)
439
440 - def __len__(self):
441 """Number of rows in the table.""" 442 return self.SELECT('COUNT() AS length').length[0]
443
444 - def __del__(self):
445 """Delete the underlying SQL table from the database.""" 446 SQL = """DROP TABLE IF EXISTS __self__""" 447 self.sql(SQL, asrecarray=False, cache=False)
448 449 # Ring buffer (from hop.utilities) 450 try: 451 import collections
452 - class Fifo(collections.deque):
453 pop = collections.deque.popleft
454
455 - class Ringbuffer(Fifo):
456 """Ring buffer of size capacity; 'pushes' data from left and discards on the right. 457 """ 458 # See http://mail.python.org/pipermail/tutor/2005-March/037149.html.
459 - def __init__(self,capacity,iterable=None):
460 if iterable is None: iterable = [] 461 super(Ringbuffer,self).__init__(iterable) 462 assert capacity > 0 463 self.capacity = capacity 464 while len(self) > self.capacity: 465 super(Ringbuffer,self).pop() # prune initial loading
466 - def append(self,x):
467 while len(self) >= self.capacity: 468 super(Ringbuffer,self).pop() 469 super(Ringbuffer,self).append(x)
470 - def __repr__(self):
471 return "Ringbuffer(capacity="+str(self.capacity)+", "+str(list(self))+")"
472 except ImportError:
473 - class Ringbuffer(list):
474 """Ringbuffer that can be treated as a list. 475 476 Note that the real queuing order is only obtained with the 477 :meth:`tolist` method. 478 479 Based on 480 http://www.onlamp.com/pub/a/python/excerpt/pythonckbk_chap1/index1.html 481 """
482 - def __init__(self, capacity, iterable=None):
483 assert capacity > 0 484 self.capacity = capacity 485 if iterable is None: 486 self = [] 487 else: 488 self[:] = list(iterable)[-self.capacity:]
489
490 - class __Full(list):
491 - def append(self, x):
492 self[self.cur] = x 493 self.cur = (self.cur+1) % self.capacity
494 - def tolist(self):
495 """Return a list of elements from the oldest to the newest.""" 496 return self[self.cur:] + self[:self.cur]
497
498 - def append(self, x):
499 super(Ringbuffer,self).append(x) 500 if len(self) >= self.capacity: 501 self[:] = self[-self.capacity:] 502 self.cur = 0 503 # Permanently change self's class from non-full to full 504 self.__class__ = self.__Full
505 - def extend(self,iterable):
506 for x in list(iterable)[-self.capacity:]: 507 self.append(x)
508 - def tolist(self):
509 """Return a list of elements from the oldest to the newest.""" 510 return self
511 - def __repr__(self):
512 return "Ringbuffer(capacity="+str(self.capacity)+", "+str(list(self))+")"
513 514 515
516 -class KRingbuffer(dict):
517 """Ring buffer with key lookup. 518 519 Basically a ringbuffer for the keys and a dict (k,v) that is 520 cleaned up to reflect the keys in the Ringbuffer. 521 """
522 - def __init__(self,capacity,*args,**kwargs):
523 super(KRingbuffer,self).__init__(*args,**kwargs) 524 self.capacity = capacity 525 self.__ringbuffer = Ringbuffer(self.capacity,self.keys()) 526 self._prune()
527 - def append(self,k,v):
528 """x.append(k,v)""" 529 self.__ringbuffer.append(k) 530 super(KRingbuffer,self).__setitem__(k,v) 531 self._prune()
532 - def clear(self):
533 """Reinitialize the KRingbuffer to empty.""" 534 self.__ringbuffer = Ringbuffer(self.capacity) 535 self._prune()
536 - def _prune(self):
537 """Primitive way to keep dict in sync with RB.""" 538 delkeys = [k for k in self.keys() if k not in self.__ringbuffer] 539 for k in delkeys: # necessary because dict is changed during iterations 540 super(KRingbuffer,self).__delitem__(k)
541 - def __setitem__(self,k,v):
542 raise NotImplementedError('Only append() is supported.')
543 - def __delitem__(self,k):
544 raise NotImplementedError('Only pop() is supported.')
545 - def update(self,*args,**kwargs):
546 raise NotImplementedError('Only append() is supported.')
547
548 -def SQLarray_fromfile(filename, **kwargs):
549 """Create a :class:`SQLarray` from *filename*. 550 551 Uses the filename suffix to detect the contents: 552 rst, txt 553 restructure text (see :mod:`recsql.rest_table` 554 csv 555 comma-separated (see :mod:`recsql.csv_table`) 556 557 :Arguments: 558 *filename* 559 name of the file that contains the data with the appropriate 560 file extension 561 *kwargs* 562 - additional arguments for :class:`SQLarray` 563 - additional arguments :class:`recsql.csv_table.Table2array` or 564 :class:`recsql.rest_table.Table2array` such as *mode* or 565 *autoncovert*. 566 """ 567 import rest_table, csv_table 568 569 Table2array = {'rst': rest_table.Table2array, 570 'txt': rest_table.Table2array, 571 'csv': csv_table.Table2array, 572 } 573 _kwnames = ('active', 'mode', 'autoconvert', 'automapping', 'sep') 574 kwargsT2a = dict((k,kwargs.pop(k)) for k in _kwnames if k in kwargs) 575 kwargsT2a.setdefault('mode', 'singlet') 576 577 root, ext = os.path.splitext(filename) 578 if ext.startswith('.'): 579 ext = ext[1:] 580 ext = ext.lower() 581 kwargsT2a['filename'] = filename 582 t = Table2array[ext](**kwargsT2a) 583 kwargs.setdefault('name', t.tablename) 584 kwargs['columns'] = t.names 585 kwargs['records'] = t.records # use records to have sqlite do type conversion 586 return SQLarray(**kwargs)
587