Tuesday, January 25, 2011

A Helper Module for PostgreSQL and Psycopg2

I have created little_pger, a Python "modulet" meant to help with common PostgreSQL/Psycopg2 tasks, by wrapping up a few things handily, and offering a coherent and pythonic interface to it.

Say we have a PG table like this:

create table document (
    document_id serial primary key,  
    title text,  
    type text check (type in ('book', 'article', 'essay'),  
    topics text[]
);

and a pair of connection/cursor objects:

>>> conn = psycopg2.connect("dbname=...")
>>> cur = conn.cursor()

You can then insert a new document record like this:

>>> insert(cur, 'document', values={'title':'PG is Easy'})

and update it like this:

>>> update(cur, 'document', set={'type':'article'}, where={'title':'PG is Easy'})

Note that you are still responsible for managing any transaction externally:

>>> conn.commit()

With the 'return_id' option (which restricts the default 'returning *' clause to the primary key's value, which is assumed to be named '<table>_id'), the insert/update above could also be done this way:

>>> doc_id = insert(cur, 'document', values={'title':'PG is Easy'}, return_id=True)
>>> update(cur, 'document', values={'type':'article'}, where={'document_id':doc_id})

Note that the 'set' or 'values' keywords can both be used with 'update'. Using a tuple (but not a list!) as a value in the 'where' dict param is translated to the proper SQL 'in' operator:

>>> select(cur, 'document', where={'type':('article', 'book')})

will return all article or book documents, whereas:

>>> select(cur, 'document', what='title', where={'type':('article', 'book')})

will only get their titles. Using a list (but not a tuple!) as a value in either the 'values' or 'where' dict params is translated to the proper SQL array syntax:

>>> update(cur, 'document', set={'topics':['database', 'programming']}, where={'document_id':doc_id})
>>> select(cur, 'document', where={'topics':['database', 'programming']})

The 'filter_values' option is useful if you do not wish to care about the exact values sent to the function. This for instance would fail:

>>> insert(cur, 'document', values={'title':'PG is Easy', 'author':'John Doe'})

because there is no 'author' column in our document table. This however would work:

>>> insert(cur, 'document', values={'title':'PG is Easy', 'author':'John Doe'}, filter_values=True)

because it trims any extra items in 'values' (i.e. corresponding to columns not belonging to the table). Note that since this option requires an extra SQL query, it makes a single call a little less efficient.

You can always append additional projection elements to a select query with the 'what' argument (which can be a string, a list or a dict, depending on your needs):

>>> select(cur, 'document', what={'*':1, 'title is not null':'has_title'})

will be translated as:

select *, (title is not null) as has_title from document

Similarly, by using the 'group_by' argument:

>>> select(cur, 'document', what=['type', 'count(*)'], group_by='type')

will yield:

select type, count(*) from document group by type

A select query can also be called with 'order_by', 'limit' and 'offset' optional arguments. You can also restrict the results to only one row by using the 'rows' argument (default is rows='all'):

>>> select(cur, 'document', where={'type':'article'], rows='one')

would return directly a document row (and not a list of rows), and would actually throw an assertion exception if there was more than one article in the document table.

This module is available for download and as a repository on GitHub.

Friday, December 3, 2010

Up the Down Penrose Staircase

I recently stumbled upon this little problem on Wired, inspired by the Penrose Staircase:

http://www.wired.com/magazine/2010/11/pl_decode_staircase

Although I'm pretty sure it was meant to be solved by hand (as a Sudoku problem would be, presumably) I thought it would be nice to find a solution less tediously, using a few lines of Python. Plus, my recursive function writing ability being a little rusty, this would make me practice a bit. So I started with a very simple way of representing the problem space:

stairs = [6, 1, 2, 6, 5, 3, 5, 2]


Even though the staircase itself is rather circular (or infinite), our data structure needs not to be, of course: it just has to wrap around. Next we need two helper functions to go up and down the stairs, using the special rules (N-1 steps when going up, N+1 when going down). It's also worth noting that our algorithm makes use of the step positions (or array index, i.e. 0 to n_stairs-1), not their values (which would be ambiguous anyway, because some of them are used twice). So these two functions take a position as their input, and they also return a position. Their only complication are the wrap around cases, which must be handled using the right position calculations.

# 0 (6) -> 5 (3)
# 5 (3) -> 7 (2)
# 6 (5) -> 2 (2)
def up(pos, stairs):
k = stairs[pos] - 1
if pos + k < len(stairs): return pos + k
else: return k - (len(stairs) - pos)

# 0 (6) -> 1 (1)
# 5 (3) -> 1 (1)
# 6 (5) -> 0 (6)
def down(pos, stairs):
k = stairs[pos] + 1
if pos - k >= 0: return pos - k
else: return len(stairs) - (k - pos)


With these, we can now write a function to explore the possible paths leading to a solution. It's rather reasonable to explore them all, in a brute force kind of way, given that the problem imposes a fixed length for a solution to be valid (it wouldn't make sense to continue searching beyond this bound). Our function is recursive: at each step in the exploration of a given path, it first checks if it yields a valid solution (i.e. (1) with the right length, (2) all the positions having been visited and (3) we are back to where we have started), in which case it prints it (in both the position and value spaces) and returns, and if not, it calls itself twice (up and down) to continue the exploration.

def solve(solution, stairs):
n_stairs = len(stairs)
# is solution valid?
if len(solution) == (n_stairs + 1):
if range(n_stairs) == sorted(solution[1:]) and solution[-1] == 0:
print solution, [stairs[p] for p in solution]
return
# go up
solve(solution + [up(solution[-1])])
# go down
solve(solution + [down(solution[-1])])


It's also possible to get rid of the helper functions altogether by dissolving their logic in previous function's body (which makes it a little bit harder to read and interpret, admittedly, but produces a more compact solution):

def solve(solution, stairs):
n_stairs = len(stairs)
if len(solution) == (n_stairs + 1):
if range(n_stairs) == sorted(solution[1:]) and solution[-1] == 0:
print solution, [stairs[p] for p in solution]
return
p = solution[-1]
up = stairs[p] - 1
down = stairs[p] + 1
solve(solution + [p + up if p + up < n_stairs else up - (n_stairs - p)], stairs)
solve(solution + [p - down if p - down >= 0 else n_stairs - (down - p)], stairs)


To use either of these solvers, we pass it an incomplete solution (which must begin on the first step, hence the first position (0) already given) and the staircase data structure:

solve([0], stairs)


Finally, if you enjoy this kind of problem, requiring some programming to solve (although not necessarily), I suggest you take a look at Project Euler, which is very challenging and fun.