Source code for iiqtools.utils.database

# -*- coding: UTF-8 -*-
"""
Utilities for interacting with the InsightIQ database
"""
from collections import namedtuple

import psycopg2

from iiqtools.exceptions import DatabaseError


_Column = namedtuple('Column', 'name type')

# This allows us to define the docstring for out namedtuple
[docs]class Column(_Column): """A database column consiting of the column's name, and it's type :Type: namedtuple :param name: The column's name :param type: The database type for the given column (i.e. int, float, double) """ pass
# This database abstraction intentionally does not comply with PEP 249 # https://www.python.org/dev/peps/pep-0249/ # Why? It's a bit much if you're not an experienced developer or you only # need to perform simple interactions with a database. The point of this # abstraction to have a simple, easy to use object for poking the IIQ database
[docs]class Database(object): """Simplifies communication with the database. The goal of this object is to make basic interactions with the database simpler than directly using the psycopg2 library. It does this by reducing the number of API methods, providing handy built-in methods for common needs (like listing tables of a database), auto-commit of transactions, and auto-rollback of bad SQL transactions. This object is not indented for power users, or long lived processes (like the InsightIQ application); it's designed for shorter lived "scripts". :param user: The username when connection to the databse :type user: String, default postgres :param dbname: The specific database to connection to. InsightIQ utilizes a different database for every monitored cluster, plus one generic database for the application (named "insightiq"). :type dbname: String, default insightiq """ def __init__(self, user='postgres', dbname='insightiq'): self._connection = psycopg2.connect(user=user, dbname=dbname) self._cursor = self._connection.cursor() if dbname == 'insightiq': self.execute("SET search_path to admin,iiq,public;") def __enter__(self): """Enables use of the ``with`` statement to auto close database connection https://docs.python.org/2.7/reference/datamodel.html#with-statement-context-managers Example:: with Database() as db: print db.cluster_databases """ return self def __exit__(self, exc_type, exc_value, the_traceback): self._connection.close()
[docs] def execute(self, sql, params=None): """Run a single SQL command :Returns: Generator :param sql: **Required** The SQL syntax to execute :type sql: String :param params: The values to use in a parameterized SQL query :type params: Iterable This method is implemented as a Python Generator: https://wiki.python.org/moin/Generators This means you are suppose to iterate over the results:: db = Database() for row in db.execute("select * from some_table;"): print row If you want all the rows as a single thing, just use ``list``:: db = Database() data = list(db.execute("select * from some_table;") But **WARNING** that might cause your program to run out of memory and crash! That reason is why this method is a generator by default ;) To perform a parameterized query (i.e. avoid SQL injection), provided the parameters as an iterable:: db = Database() # passing in "foo_column" alone would try and string format every # character of "foo_column" into your SQL statement. # Instead, make "foo_column" a tuple by wrapping it like ("foo_column",) # Note: the trailing comma is required. data = list(db.execute("select %s from some_table", ("foo_column",))) """ return self._query(sql, params=params, many=False)
[docs] def executemany(self, sql, params): """Run the SQL for every iteration of the supplied params This method behaves exactly like `execute`, except that it can perform multiple SQL commands in a single transaction. The point of this method is so you can retain Atomicity when you must execute the same SQL with different parameters. This method isn't intended to be faster than looping over the normal `execute` method with the different parameters. :Returns: Generator :param sql: **Required** The SQL syntax to execute :type sql: String :param params: **Required** The parameterized values to iterate :type params: Iterable """ return self._query(sql, params=params, many=True)
def _query(self, sql, params=None, many=False): """Internal method for running SQL commands The code difference between execute, and executemany is just the method we call on the cursor object. :Returns: Generator :param sql: **Required** The SQL syntax to execute :type sql: String :param params: The values to use in a parameterized SQL query :type params: Iterable :param many: Set to True to call `executemany` :type many: Boolean, default is False """ if many: call = getattr(self._cursor, 'executemany') else: call = getattr(self._cursor, 'execute') try: call(sql, params) self._connection.commit() except psycopg2.Error as doh: # All psycopg2 Exceptions are subclassed from psycopg2.Error self._connection.rollback() raise DatabaseError(message=doh.pgerror, pgcode=doh.pgcode) else: data = self._cursor.fetchone() while data: yield data data = self._cursor.fetchone() @property def isolation_level(self): """Set the isolation level of your connnection to the database""" # To drop tables, you'll have to set isolation_level to 0 (zero) # https://www.postgresql.org/docs/current/static/transaction-iso.html return self._connection.isolation_level @isolation_level.setter def isolation_level(self, value): # https://www.postgresql.org/docs/current/static/transaction-iso.html self._connection.set_isolation_level(value)
[docs] def tables(self): """Obtain a list of all the tables for the database you're connected to :Returns: List """ sql = """SELECT relname FROM pg_class WHERE relkind='r' AND relname !~ '^(pg_|sql_)';""" # data looks like [('sometable1',), ('sometable2',)] # drop all the tuples, so we just return a list of strings return [x[0] for x in self.execute(sql)]
[docs] def cluster_databases(self): """Obtain a list of all the cluster databases :Returns: List """ sql = """SELECT datname from pg_database;""" dbs = list(self.execute(sql)) ignore = ('template0', 'template1', 'postgres', 'insightiq') return [x[0] for x in dbs if x[0] not in ignore]
[docs] def table_schema(self, table): """Given a table, return the schema for that table :Returns: Tuple of namedtuples -> (Column(name, type), Column(name, type)) :param table: **Required** The table to obtain the primary key from :type table: String """ sql = """SELECT column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = %s;""" return tuple([Column(x[0], x[1]) for x in self.execute(sql, (table,))])
[docs] def primary_key(self, table): """Given a table, return the primary key .. note:: If you supply a timeseries table that DOES NOT have an EPOC timestamp in the name, you will get zero results. For timeseries tables, supply a table that contains the EPOC timestamps to see the primary key. :Returns: Tuple of namedtuples -> (Column(name, type), Column(name, type)) :param table: **Required** The table to obtain the primary key from :type table: String """ sql = """SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary;""" return tuple([Column(x[0], x[1]) for x in self.execute(sql, (table,))])
[docs] def close(self): """Disconnect from the database""" self._connection.close()