1
2
3
4
5 """
6 :mod:`sqlarray` --- Implementation of :class:`SQLarray`
7 =======================================================
8
9 :class:`SQLarray` is a thin wrapper around pysqlite SQL tables. The main
10 features ares that ``SELECT`` queries can return ``numpy.recarrays`` and the
11 :meth:`SQLarray.selection` method returns a new :class:`SQLarray` instance.
12
13 numpy arrays can be stored in sql fields which allows advanced table
14 aggregate functions such as ``histogram``.
15
16 A number of additional SQL functions are defined.
17
18 :TODO:
19 * Make object saveable (i.e. store the database on disk instead of
20 memory or dump the memory db and provide a load() method
21 * Use hooks for the pickling protocol to make this transparent.
22
23 .. SeeAlso:: PyTables_ is a high-performance interface to table data.
24
25 .. _PyTables: http://www.pytables.org
26
27 Module content
28 --------------
29 .. See the autogenerated content in the online docs or the source code.
30
31 """
32
33 import os.path
34 import re
35 try:
36 from pysqlite2 import dbapi2 as sqlite
37 except ImportError:
38 from sqlite3 import dbapi2 as sqlite
39 import numpy
40 from sqlutil import adapt_numpyarray, convert_numpyarray,\
41 adapt_object, convert_object
42 from rest_table import Table2array
43
44 sqlite.register_adapter(numpy.ndarray,adapt_numpyarray)
45 sqlite.register_adapter(numpy.recarray,adapt_numpyarray)
46 sqlite.register_adapter(numpy.core.records.recarray,adapt_numpyarray)
47 sqlite.register_adapter(tuple,adapt_object)
48 sqlite.register_adapter(list,adapt_object)
49 sqlite.register_converter("NumpyArray", convert_numpyarray)
50 sqlite.register_converter("Object", convert_object)
51
52
54 """A SQL table that returns (mostly) rec arrays.
55
56 .. method:: SQLarray([name[,records[,columns[,cachesize=5,connection=None]]]])
57
58 :Arguments:
59 name
60 table name (can be referred to as '__self__' in SQL queries)
61 records
62 numpy record array that describes the layout and initializes the
63 table OR any iterable (and then columns must be set, too) OR a string
64 that contains a single, *simple reStructured text table* (and the table name is
65 set from the table name in the reST table.)
66 If ``None`` then simply associate with existing table name.
67 filename
68 Alternatively to *records*, read a reStructured table from *filename*.
69 columns
70 sequence of column names (only used if records does not have
71 attribute dtype.names) [``None``]
72 cachesize
73 number of (query, result) pairs that are cached [5]
74 connection
75 If not ``None``, reuse this connection; this adds a new table to the same
76 database, which allows more complicated queries with cross-joins. The
77 table's connection is available as the attribute T.connection. [``None``]
78 is_tmp
79 ``True``: create a tmp table; ``False``: regular table in db [``False``]
80
81 :Bugs:
82 * :exc:`InterfaceError`: *Error binding parameter 0 - probably unsupported type*
83
84 In this case the recarray contained types such as ``numpy.int64`` that are not
85 understood by sqlite. Either convert the data manually (by setting the numpy
86 dtypes yourself on the recarray, or better: feed a simple list of tuples ("records")
87 to this class in *records*. Make sure that these tuples only contain standard python types.
88 Together with *records* you will also have to supply the names of the data columns
89 in the keyword argument *columns*.
90
91 If you are reading from a file then it might be simpler to
92 use :func:`recsql.sqlarray.SQLarray_fromfile`.
93 """
94
95 tmp_table_name = '__tmp_merge_table'
96
97 - def __init__(self,name=None ,records=None, filename=None, columns=None,
98 cachesize=5, connection=None, is_tmp=False, **kwargs):
99 """Build the SQL table from a numpy record array.
100 """
101 self.name = str(name)
102 if self.name == self.tmp_table_name and not is_tmp:
103 raise ValueError('name = %s is reserved, choose another one' % name)
104 if connection is None:
105 self.connection = sqlite.connect(':memory:',
106 detect_types=sqlite.PARSE_DECLTYPES | sqlite.PARSE_COLNAMES)
107 self._init_sqlite_functions()
108 else:
109 self.connection = connection
110 self.cursor = self.connection.cursor()
111
112 if records is None and filename is None:
113 if name is None:
114 raise ValueError("Provide either an existing table name or a source of records.")
115
116
117 SQL = "SELECT * FROM %(name)s WHERE 0" % vars(self)
118 c = self.cursor
119 try:
120 c.execute(SQL)
121 except sqlite.OperationalError,err:
122 if str(err).find('no such table') > -1 or \
123 str(err).find('syntax error') > -1:
124 raise ValueError("Provide existing legal 'name' of an existing table not %r"
125 % self.name)
126 else:
127 raise
128 self.columns = tuple([x[0] for x in c.description])
129 self.ncol = len(self.columns)
130 else:
131
132 if records is None and not filename is None:
133 records = ''.join(open(filename,'r').readlines())
134
135 if type(records) is str:
136
137 P = Table2array(records, **kwargs)
138 P.parse()
139 records = P.records
140 columns = P.names
141 self.name = P.tablename
142 try:
143 self.columns = records.dtype.names
144 except AttributeError:
145 if columns is None:
146 raise TypeError('records must be a recarray or columns should be supplied')
147 self.columns = columns
148 self.ncol = len(self.columns)
149
150
151
152
153 if not is_tmp:
154 SQL = "CREATE TABLE "+self.name+" ("+",".join(self.columns)+")"
155 else:
156
157 SQL = "CREATE TEMPORARY TABLE "+self.name+" ("+",".join(self.columns)+")"
158 self.cursor.execute(SQL)
159 SQL = "INSERT INTO "+self.name+" ("+ ",".join(self.columns)+") "\
160 +"VALUES "+"("+",".join(self.ncol*['?'])+")"
161 try:
162
163
164
165 self.cursor.executemany(SQL,records)
166 except:
167 import sys
168 sys.stderr.write("ERROR: You are probably feeding a recarray; sqlite does not know how to \n"
169 " deal with special numpy types such as int32 or int64. Try using the \n"
170 " recsql.SQLarray_fromfile() function or feed simple records (see docs).")
171 raise
172
173
174 self.__cache = KRingbuffer(cachesize)
175
177 doc = """Return underlying SQL table as a read-only record array."""
178 def fget(self):
179 return self.SELECT('*')
180 return locals()
181 recarray = property(**recarray())
182
183 - def merge(self,recarray,columns=None):
184 """Merge another recarray with the same columns into this table.
185
186 :Arguments:
187 recarray
188 numpy record array that describes the layout and initializes the
189 table
190
191 :Returns:
192 n number of inserted rows
193
194 :Raises:
195 Raises an exception if duplicate and incompatible data exist
196 in the main table and the new one.
197 """
198 len_before = len(self)
199
200 tmparray = SQLarray(self.tmp_table_name, records=recarray, columns=columns,
201 connection=self.connection, is_tmp=True)
202 len_tmp = len(tmparray)
203
204 SQL = """INSERT OR ABORT INTO __self__ SELECT * FROM %s""" % self.tmp_table_name
205 self.sql(SQL)
206 len_after = len(self)
207 n_inserted = len_after - len_before
208 assert len_tmp == n_inserted
209 del tmparray
210 return n_inserted
211
213 """Merge an existing table in the database with the __self__ table.
214
215 Executes as ``'INSERT INTO __self__ SELECT * FROM <name>'``.
216 However, this method is probably used less often than the simpler :meth:`merge`.
217
218 :Arguments:
219 name name of the table in the database (must be compatible with __self__)
220
221 :Returns:
222 n number of inserted rows
223 """
224 l_before = len(self)
225 SQL = """INSERT OR ABORT INTO __self__ SELECT * FROM %s""" % name
226 self.sql(SQL)
227 l_after = len(self)
228 return l_after - l_before
229
230 - def sql_index(self,index_name,column_names,unique=True):
231 """Add a named index on given columns to improve performance."""
232 if type(column_names) == str:
233 column_names = [column_names]
234 try:
235 if len(column_names) == 0:
236 raise TypeError
237 except TypeError:
238 raise ValueError("Provide a list of column names for an index.")
239 if unique:
240 UNIQUE = "UNIQUE"
241 else:
242 UNIQUE = ""
243 table_name = self.name
244 columns = ",".join(column_names)
245 SQL = """CREATE %(UNIQUE)s INDEX %(index_name)s ON %(table_name)s """\
246 """(%(columns)s)""" % locals()
247 self.sql(SQL)
248
250 """Execute a simple SQL ``SELECT`` statement and returns values as new numpy rec array.
251
252 The arguments *fields* and the additional optional arguments
253 are simply concatenated with additional SQL statements
254 according to the template::
255
256 SELECT <fields> FROM __self__ [args]
257
258 The simplest fields argument is ``"*"``.
259
260 Example:
261 Create a recarray in which students with average grade less than
262 3 are listed::
263
264 result = T.SELECT("surname, subject, year, avg(grade) AS avg_grade",
265 "WHERE avg_grade < 3", "GROUP BY surname,subject",
266 "ORDER BY avg_grade,surname")
267
268 The resulting SQL would be::
269
270 SELECT surname, subject, year, avg(grade) AS avg_grade FROM __self__
271 WHERE avg_grade < 3
272 GROUP BY surname,subject
273 ORDER BY avg_grade,surname
274
275 Note how one can use aggregate functions such avg().
276
277 The string *'__self__'* is automatically replaced with the table
278 name (``T.name``); this can be used for cartesian products such as ::
279
280 LEFT JOIN __self__ WHERE ...
281
282 .. Note:: See the documentation for :meth:`~SQLarray.sql` for more details on
283 the available keyword arguments and the use of ``?`` parameter
284 interpolation.
285 """
286 SQL = "SELECT "+str(fields)+" FROM __self__ "+ " ".join(args)
287 return self.sql(SQL,**kwargs)
288
289 SELECT = sql_select
290
291 - def sql(self,SQL,parameters=None,asrecarray=True,cache=True):
292 """Execute sql statement.
293
294 :Arguments:
295 SQL : string
296 Full SQL command; can contain the ``?`` place holder so that values
297 supplied with the ``parameters`` keyword can be interpolated using
298 the ``pysqlite`` interface.
299 parameters : tuple
300 Parameters for ``?`` interpolation.
301 asrecarray : boolean
302 ``True``: return a ``numpy.recarray`` if possible;
303 ``False``: return records as a list of tuples. [``True``]
304 cache : boolean
305 Should the results be cached? Set to ``False`` for large queries to
306 avoid memory issues. Queries with ``?`` place holders are never cached.
307 [``True``]
308
309 .. warning::
310 There are **no sanity checks** applied to the SQL.
311
312 If possible, the returned list of tuples is turned into a
313 numpy record array, otherwise the original list of tuples is
314 returned.
315
316 .. warning::
317 Potential BUG: if there are memory issues then it can
318 happen that we just silently fall back to a tuple even
319 though calling code expects a recarray; because we
320 swallowed ANY exception the caller will never know
321
322 The last cachesize queries are cached (for cache=True) and are
323 returned directly unless the table has been modified.
324
325 .. Note:: '__self__' is substituted with the table name. See the doc
326 string of the :meth:`SELECT` method for more details.
327 """
328 SQL = SQL.replace('__self__',self.name)
329
330
331
332
333
334
335
336
337 if not '?' in SQL and cache and SQL in self.__cache:
338 return self.__cache[SQL]
339
340 c = self.cursor
341
342 if parameters is None:
343 c.execute(SQL)
344 else:
345 c.execute(SQL, parameters)
346
347 if c.rowcount > 0 or SQL.upper().find('DELETE') > -1:
348
349
350
351
352 self.__cache.clear()
353 result = c.fetchall()
354 if not result:
355 return []
356 if asrecarray:
357 try:
358 names = [x[0] for x in c.description]
359 result = numpy.rec.fromrecords(result,names=names)
360 except:
361
362
363
364
365 pass
366 else:
367 pass
368 if cache:
369 self.__cache.append(SQL,result)
370 return result
371
373 """Return minimum and maximum of variable across all rows of data."""
374 (vmin,vmax), = self.SELECT('min(%(variable)s), max(%(variable)s)' % vars())
375 return vmin,vmax
376
377 - def selection(self,SQL,parameters=None,**kwargs):
378 """Return a new SQLarray from a SELECT selection.
379
380 This is a very useful method because it allows one to build complicated
381 selections and essentially new tables from existing data.
382
383 Examples::
384
385 s = selection('a > 3')
386 s = selection('a > ?', (3,))
387 s = selection('SELECT * FROM __self__ WHERE a > ? AND b < ?', (3, 10))
388 """
389
390
391
392
393 import md5
394
395
396
397 safe_sql = re.match(r'(?P<SQL>[^;]*)',SQL).group('SQL')
398
399 if re.match(r'\s*SELECT.*FROM',safe_sql,flags=re.IGNORECASE):
400 _sql = safe_sql
401 else:
402
403 _sql = """SELECT * FROM __self__ WHERE """+str(safe_sql)
404
405 _sql = _sql.replace('__self__', self.name)
406
407 newname = kwargs.pop('name', 'selection_'+md5.new(_sql).hexdigest())
408
409
410
411 _sql = "CREATE TABLE %(newname)s AS " % vars() + _sql
412
413 c = self.cursor
414 if parameters is None:
415 c.execute(_sql)
416 else:
417 c.execute(_sql, parameters)
418
419
420 return SQLarray(newname, None, connection=self.connection)
421
423 """additional SQL functions to the database"""
424 import sqlfunctions
425
426 self.connection.create_function("sqrt", 1,sqlfunctions._sqrt)
427 self.connection.create_function("fformat",2,sqlfunctions._fformat)
428 self.connection.create_aggregate("std",1,sqlfunctions._Stdev)
429 self.connection.create_aggregate("median",1,sqlfunctions._Median)
430 self.connection.create_aggregate("array",1,sqlfunctions._NumpyArray)
431 self.connection.create_aggregate("histogram",4,sqlfunctions._NumpyHistogram)
432 self.connection.create_aggregate("distribution",4,sqlfunctions._NormedNumpyHistogram)
433 self.connection.create_aggregate("meanhistogram",5,sqlfunctions._MeanHistogram)
434 self.connection.create_aggregate("stdhistogram",5,sqlfunctions._StdHistogram)
435 self.connection.create_aggregate("minhistogram",5,sqlfunctions._MinHistogram)
436 self.connection.create_aggregate("maxhistogram",5,sqlfunctions._MaxHistogram)
437 self.connection.create_aggregate("medianhistogram",5,sqlfunctions._MedianHistogram)
438 self.connection.create_aggregate("zscorehistogram",5,sqlfunctions._ZscoreHistogram)
439
441 """Number of rows in the table."""
442 return self.SELECT('COUNT() AS length').length[0]
443
445 """Delete the underlying SQL table from the database."""
446 SQL = """DROP TABLE IF EXISTS __self__"""
447 self.sql(SQL, asrecarray=False, cache=False)
448
449
450 try:
451 import collections
452 - class Fifo(collections.deque):
453 pop = collections.deque.popleft
454
456 """Ring buffer of size capacity; 'pushes' data from left and discards on the right.
457 """
458
459 - def __init__(self,capacity,iterable=None):
460 if iterable is None: iterable = []
461 super(Ringbuffer,self).__init__(iterable)
462 assert capacity > 0
463 self.capacity = capacity
464 while len(self) > self.capacity:
465 super(Ringbuffer,self).pop()
471 return "Ringbuffer(capacity="+str(self.capacity)+", "+str(list(self))+")"
472 except ImportError:
474 """Ringbuffer that can be treated as a list.
475
476 Note that the real queuing order is only obtained with the
477 :meth:`tolist` method.
478
479 Based on
480 http://www.onlamp.com/pub/a/python/excerpt/pythonckbk_chap1/index1.html
481 """
482 - def __init__(self, capacity, iterable=None):
483 assert capacity > 0
484 self.capacity = capacity
485 if iterable is None:
486 self = []
487 else:
488 self[:] = list(iterable)[-self.capacity:]
489
492 self[self.cur] = x
493 self.cur = (self.cur+1) % self.capacity
495 """Return a list of elements from the oldest to the newest."""
496 return self[self.cur:] + self[:self.cur]
497
499 super(Ringbuffer,self).append(x)
500 if len(self) >= self.capacity:
501 self[:] = self[-self.capacity:]
502 self.cur = 0
503
504 self.__class__ = self.__Full
506 for x in list(iterable)[-self.capacity:]:
507 self.append(x)
509 """Return a list of elements from the oldest to the newest."""
510 return self
512 return "Ringbuffer(capacity="+str(self.capacity)+", "+str(list(self))+")"
513
514
515
517 """Ring buffer with key lookup.
518
519 Basically a ringbuffer for the keys and a dict (k,v) that is
520 cleaned up to reflect the keys in the Ringbuffer.
521 """
522 - def __init__(self,capacity,*args,**kwargs):
533 """Reinitialize the KRingbuffer to empty."""
534 self.__ringbuffer = Ringbuffer(self.capacity)
535 self._prune()
537 """Primitive way to keep dict in sync with RB."""
538 delkeys = [k for k in self.keys() if k not in self.__ringbuffer]
539 for k in delkeys:
540 super(KRingbuffer,self).__delitem__(k)
542 raise NotImplementedError('Only append() is supported.')
544 raise NotImplementedError('Only pop() is supported.')
545 - def update(self,*args,**kwargs):
546 raise NotImplementedError('Only append() is supported.')
547
549 """Create a :class:`SQLarray` from *filename*.
550
551 Uses the filename suffix to detect the contents:
552 rst, txt
553 restructure text (see :mod:`recsql.rest_table`
554 csv
555 comma-separated (see :mod:`recsql.csv_table`)
556
557 :Arguments:
558 *filename*
559 name of the file that contains the data with the appropriate
560 file extension
561 *kwargs*
562 - additional arguments for :class:`SQLarray`
563 - additional arguments :class:`recsql.csv_table.Table2array` or
564 :class:`recsql.rest_table.Table2array` such as *mode* or
565 *autoncovert*.
566 """
567 import rest_table, csv_table
568
569 Table2array = {'rst': rest_table.Table2array,
570 'txt': rest_table.Table2array,
571 'csv': csv_table.Table2array,
572 }
573 _kwnames = ('active', 'mode', 'autoconvert', 'automapping', 'sep')
574 kwargsT2a = dict((k,kwargs.pop(k)) for k in _kwnames if k in kwargs)
575 kwargsT2a.setdefault('mode', 'singlet')
576
577 root, ext = os.path.splitext(filename)
578 if ext.startswith('.'):
579 ext = ext[1:]
580 ext = ext.lower()
581 kwargsT2a['filename'] = filename
582 t = Table2array[ext](**kwargsT2a)
583 kwargs.setdefault('name', t.tablename)
584 kwargs['columns'] = t.names
585 kwargs['records'] = t.records
586 return SQLarray(**kwargs)
587