forked from pool/python-pyodbc
Matej Cepl
f4b9cb2504
- Add sqlitetests.py & testutils.py - Update to v5.1.0 OBS-URL: https://build.opensuse.org/request/show/1145945 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-pyodbc?expand=0&rev=29
693 lines
25 KiB
Python
693 lines
25 KiB
Python
#!/usr/bin/python
|
|
|
|
usage = """\
|
|
%(prog)s [options] connection_string
|
|
|
|
Unit tests for SQLite using the ODBC driver from http://www.ch-werner.de/sqliteodbc
|
|
|
|
To use, pass a connection string as the parameter. The tests will create and
|
|
drop tables t1 and t2 as necessary. On Windows, use the 32-bit driver with
|
|
32-bit Python and the 64-bit driver with 64-bit Python (regardless of your
|
|
operating system bitness).
|
|
|
|
These run using the version from the 'build' directory, not the version
|
|
installed into the Python directories. You must run python setup.py build
|
|
before running the tests.
|
|
|
|
You can also put the connection string into a tmp/setup.cfg file like so:
|
|
|
|
[sqlitetests]
|
|
connection-string=Driver=SQLite3 ODBC Driver;Database=sqlite.db
|
|
"""
|
|
|
|
import sys, os, re
|
|
import unittest
|
|
from decimal import Decimal
|
|
from datetime import datetime, date, time
|
|
from os.path import join, getsize, dirname, abspath
|
|
from testutils import *
|
|
|
|
_TESTSTR = '0123456789-abcdefghijklmnopqrstuvwxyz-'
|
|
|
|
def _generate_test_string(length):
|
|
"""
|
|
Returns a string of `length` characters, constructed by repeating _TESTSTR as necessary.
|
|
|
|
To enhance performance, there are 3 ways data is read, based on the length of the value, so most data types are
|
|
tested with 3 lengths. This function helps us generate the test data.
|
|
|
|
We use a recognizable data set instead of a single character to make it less likely that "overlap" errors will
|
|
be hidden and to help us manually identify where a break occurs.
|
|
"""
|
|
if length <= len(_TESTSTR):
|
|
return _TESTSTR[:length]
|
|
|
|
c = (length + len(_TESTSTR)-1) // len(_TESTSTR)
|
|
v = _TESTSTR * c
|
|
return v[:length]
|
|
|
|
class SqliteTestCase(unittest.TestCase):
|
|
|
|
SMALL_FENCEPOST_SIZES = [ 0, 1, 255, 256, 510, 511, 512, 1023, 1024, 2047, 2048, 4000 ]
|
|
LARGE_FENCEPOST_SIZES = [ 4095, 4096, 4097, 10 * 1024, 20 * 1024 ]
|
|
|
|
STR_FENCEPOSTS = [ _generate_test_string(size) for size in SMALL_FENCEPOST_SIZES ]
|
|
BYTE_FENCEPOSTS = [ bytes(s, 'ascii') for s in STR_FENCEPOSTS ]
|
|
IMAGE_FENCEPOSTS = BYTE_FENCEPOSTS + [ bytes(_generate_test_string(size), 'ascii') for size in LARGE_FENCEPOST_SIZES ]
|
|
|
|
def __init__(self, method_name, connection_string):
|
|
unittest.TestCase.__init__(self, method_name)
|
|
self.connection_string = connection_string
|
|
|
|
def setUp(self):
|
|
self.cnxn = pyodbc.connect(self.connection_string)
|
|
self.cursor = self.cnxn.cursor()
|
|
|
|
for i in range(3):
|
|
try:
|
|
self.cursor.execute("drop table t%d" % i)
|
|
self.cnxn.commit()
|
|
except:
|
|
pass
|
|
|
|
self.cnxn.rollback()
|
|
|
|
def tearDown(self):
|
|
try:
|
|
self.cursor.close()
|
|
self.cnxn.close()
|
|
except:
|
|
# If we've already closed the cursor or connection, exceptions are thrown.
|
|
pass
|
|
|
|
def test_multiple_bindings(self):
|
|
"More than one bind and select on a cursor"
|
|
self.cursor.execute("create table t1(n int)")
|
|
self.cursor.execute("insert into t1 values (?)", 1)
|
|
self.cursor.execute("insert into t1 values (?)", 2)
|
|
self.cursor.execute("insert into t1 values (?)", 3)
|
|
for i in range(3):
|
|
self.cursor.execute("select n from t1 where n < ?", 10)
|
|
self.cursor.execute("select n from t1 where n < 3")
|
|
|
|
|
|
def test_different_bindings(self):
|
|
self.cursor.execute("create table t1(n int)")
|
|
self.cursor.execute("create table t2(d datetime)")
|
|
self.cursor.execute("insert into t1 values (?)", 1)
|
|
self.cursor.execute("insert into t2 values (?)", datetime.now())
|
|
|
|
def test_drivers(self):
|
|
p = pyodbc.drivers()
|
|
self.assertTrue(isinstance(p, list))
|
|
|
|
def test_datasources(self):
|
|
p = pyodbc.dataSources()
|
|
self.assertTrue(isinstance(p, dict))
|
|
|
|
def test_getinfo_string(self):
|
|
value = self.cnxn.getinfo(pyodbc.SQL_CATALOG_NAME_SEPARATOR)
|
|
self.assertTrue(isinstance(value, str))
|
|
|
|
def test_getinfo_bool(self):
|
|
value = self.cnxn.getinfo(pyodbc.SQL_ACCESSIBLE_TABLES)
|
|
self.assertTrue(isinstance(value, bool))
|
|
|
|
def test_getinfo_int(self):
|
|
value = self.cnxn.getinfo(pyodbc.SQL_DEFAULT_TXN_ISOLATION)
|
|
self.assertTrue(isinstance(value, int))
|
|
|
|
def test_getinfo_smallint(self):
|
|
value = self.cnxn.getinfo(pyodbc.SQL_CONCAT_NULL_BEHAVIOR)
|
|
self.assertTrue(isinstance(value, int))
|
|
|
|
def _test_strtype(self, sqltype, value, colsize=None):
|
|
"""
|
|
The implementation for string, Unicode, and binary tests.
|
|
"""
|
|
assert colsize is None or (value is None or colsize >= len(value))
|
|
|
|
if colsize:
|
|
sql = "create table t1(s {}({}))".format(sqltype, colsize)
|
|
else:
|
|
sql = "create table t1(s %s)" % sqltype
|
|
|
|
self.cursor.execute(sql)
|
|
self.cursor.execute("insert into t1 values(?)", value)
|
|
v = self.cursor.execute("select * from t1").fetchone()[0]
|
|
self.assertEqual(type(v), type(value))
|
|
|
|
if value is not None:
|
|
self.assertEqual(len(v), len(value))
|
|
|
|
self.assertEqual(v, value)
|
|
|
|
# Reported by Andy Hochhaus in the pyodbc group: In 2.1.7 and earlier, a hardcoded length of 255 was used to
|
|
# determine whether a parameter was bound as a SQL_VARCHAR or SQL_LONGVARCHAR. Apparently SQL Server chokes if
|
|
# we bind as a SQL_LONGVARCHAR and the target column size is 8000 or less, which is considers just SQL_VARCHAR.
|
|
# This means binding a 256 character value would cause problems if compared with a VARCHAR column under
|
|
# 8001. We now use SQLGetTypeInfo to determine the time to switch.
|
|
#
|
|
# [42000] [Microsoft][SQL Server Native Client 10.0][SQL Server]The data types varchar and text are incompatible in the equal to operator.
|
|
|
|
self.cursor.execute("select * from t1 where s=?", value)
|
|
|
|
|
|
def _test_strliketype(self, sqltype, value, colsize=None):
|
|
"""
|
|
The implementation for text, image, ntext, and binary.
|
|
|
|
These types do not support comparison operators.
|
|
"""
|
|
assert colsize is None or (value is None or colsize >= len(value))
|
|
|
|
if colsize:
|
|
sql = "create table t1(s {}({}))".format(sqltype, colsize)
|
|
else:
|
|
sql = "create table t1(s %s)" % sqltype
|
|
|
|
self.cursor.execute(sql)
|
|
self.cursor.execute("insert into t1 values(?)", value)
|
|
v = self.cursor.execute("select * from t1").fetchone()[0]
|
|
self.assertEqual(type(v), type(value))
|
|
|
|
if value is not None:
|
|
self.assertEqual(len(v), len(value))
|
|
|
|
self.assertEqual(v, value)
|
|
|
|
#
|
|
# text
|
|
#
|
|
|
|
def test_text_null(self):
|
|
self._test_strtype('text', None, 100)
|
|
|
|
# Generate a test for each fencepost size: test_text_0, etc.
|
|
def _maketest(value):
|
|
def t(self):
|
|
self._test_strtype('text', value, len(value))
|
|
return t
|
|
for value in STR_FENCEPOSTS:
|
|
locals()['test_text_%s' % len(value)] = _maketest(value)
|
|
|
|
def test_text_upperlatin(self):
|
|
self._test_strtype('varchar', 'á')
|
|
|
|
#
|
|
# blob
|
|
#
|
|
|
|
def test_null_blob(self):
|
|
self._test_strtype('blob', None, 100)
|
|
|
|
def test_large_null_blob(self):
|
|
# Bug 1575064
|
|
self._test_strtype('blob', None, 4000)
|
|
|
|
# Generate a test for each fencepost size: test_unicode_0, etc.
|
|
def _maketest(value):
|
|
def t(self):
|
|
self._test_strtype('blob', value, len(value))
|
|
return t
|
|
for value in BYTE_FENCEPOSTS:
|
|
locals()['test_blob_%s' % len(value)] = _maketest(value)
|
|
|
|
def test_subquery_params(self):
|
|
"""Ensure parameter markers work in a subquery"""
|
|
self.cursor.execute("create table t1(id integer, s varchar(20))")
|
|
self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
|
|
row = self.cursor.execute("""
|
|
select x.id
|
|
from (
|
|
select id
|
|
from t1
|
|
where s = ?
|
|
and id between ? and ?
|
|
) x
|
|
""", 'test', 1, 10).fetchone()
|
|
self.assertNotEqual(row, None)
|
|
self.assertEqual(row[0], 1)
|
|
|
|
def _exec(self):
|
|
self.cursor.execute(self.sql)
|
|
|
|
def test_close_cnxn(self):
|
|
"""Make sure using a Cursor after closing its connection doesn't crash."""
|
|
|
|
self.cursor.execute("create table t1(id integer, s varchar(20))")
|
|
self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
|
|
self.cursor.execute("select * from t1")
|
|
|
|
self.cnxn.close()
|
|
|
|
# Now that the connection is closed, we expect an exception. (If the code attempts to use
|
|
# the HSTMT, we'll get an access violation instead.)
|
|
self.sql = "select * from t1"
|
|
self.assertRaises(pyodbc.ProgrammingError, self._exec)
|
|
|
|
def test_negative_row_index(self):
|
|
self.cursor.execute("create table t1(s varchar(20))")
|
|
self.cursor.execute("insert into t1 values(?)", "1")
|
|
row = self.cursor.execute("select * from t1").fetchone()
|
|
self.assertEqual(row[0], "1")
|
|
self.assertEqual(row[-1], "1")
|
|
|
|
def test_version(self):
|
|
self.assertEqual(3, len(pyodbc.version.split('.'))) # 1.3.1 etc.
|
|
|
|
#
|
|
# ints and floats
|
|
#
|
|
|
|
def test_int(self):
|
|
value = 1234
|
|
self.cursor.execute("create table t1(n int)")
|
|
self.cursor.execute("insert into t1 values (?)", value)
|
|
result = self.cursor.execute("select n from t1").fetchone()[0]
|
|
self.assertEqual(result, value)
|
|
|
|
def test_negative_int(self):
|
|
value = -1
|
|
self.cursor.execute("create table t1(n int)")
|
|
self.cursor.execute("insert into t1 values (?)", value)
|
|
result = self.cursor.execute("select n from t1").fetchone()[0]
|
|
self.assertEqual(result, value)
|
|
|
|
def test_bigint(self):
|
|
input = 3000000000
|
|
self.cursor.execute("create table t1(d bigint)")
|
|
self.cursor.execute("insert into t1 values (?)", input)
|
|
result = self.cursor.execute("select d from t1").fetchone()[0]
|
|
self.assertEqual(result, input)
|
|
|
|
def test_negative_bigint(self):
|
|
# Issue 186: BIGINT problem on 32-bit architecture
|
|
input = -430000000
|
|
self.cursor.execute("create table t1(d bigint)")
|
|
self.cursor.execute("insert into t1 values (?)", input)
|
|
result = self.cursor.execute("select d from t1").fetchone()[0]
|
|
self.assertEqual(result, input)
|
|
|
|
def test_float(self):
|
|
value = 1234.567
|
|
self.cursor.execute("create table t1(n float)")
|
|
self.cursor.execute("insert into t1 values (?)", value)
|
|
result = self.cursor.execute("select n from t1").fetchone()[0]
|
|
self.assertEqual(result, value)
|
|
|
|
def test_negative_float(self):
|
|
value = -200
|
|
self.cursor.execute("create table t1(n float)")
|
|
self.cursor.execute("insert into t1 values (?)", value)
|
|
result = self.cursor.execute("select n from t1").fetchone()[0]
|
|
self.assertEqual(value, result)
|
|
|
|
#
|
|
# rowcount
|
|
#
|
|
|
|
# Note: SQLRowCount does not define what the driver must return after a select statement
|
|
# and says that its value should not be relied upon. The sqliteodbc driver is hardcoded to
|
|
# return 0 so I've deleted the test.
|
|
|
|
def test_rowcount_delete(self):
|
|
self.assertEqual(self.cursor.rowcount, -1)
|
|
self.cursor.execute("create table t1(i int)")
|
|
count = 4
|
|
for i in range(count):
|
|
self.cursor.execute("insert into t1 values (?)", i)
|
|
self.cursor.execute("delete from t1")
|
|
self.assertEqual(self.cursor.rowcount, count)
|
|
|
|
def test_rowcount_nodata(self):
|
|
"""
|
|
This represents a different code path than a delete that deleted something.
|
|
|
|
The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over
|
|
the code that errors out and drop down to the same SQLRowCount code. On the other hand, we could hardcode a
|
|
zero return value.
|
|
"""
|
|
self.cursor.execute("create table t1(i int)")
|
|
# This is a different code path internally.
|
|
self.cursor.execute("delete from t1")
|
|
self.assertEqual(self.cursor.rowcount, 0)
|
|
|
|
# In the 2.0.x branch, Cursor.execute sometimes returned the cursor and sometimes the rowcount. This proved very
|
|
# confusing when things went wrong and added very little value even when things went right since users could always
|
|
# use: cursor.execute("...").rowcount
|
|
|
|
def test_retcursor_delete(self):
|
|
self.cursor.execute("create table t1(i int)")
|
|
self.cursor.execute("insert into t1 values (1)")
|
|
v = self.cursor.execute("delete from t1")
|
|
self.assertEqual(v, self.cursor)
|
|
|
|
def test_retcursor_nodata(self):
|
|
"""
|
|
This represents a different code path than a delete that deleted something.
|
|
|
|
The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over
|
|
the code that errors out and drop down to the same SQLRowCount code.
|
|
"""
|
|
self.cursor.execute("create table t1(i int)")
|
|
# This is a different code path internally.
|
|
v = self.cursor.execute("delete from t1")
|
|
self.assertEqual(v, self.cursor)
|
|
|
|
def test_retcursor_select(self):
|
|
self.cursor.execute("create table t1(i int)")
|
|
self.cursor.execute("insert into t1 values (1)")
|
|
v = self.cursor.execute("select * from t1")
|
|
self.assertEqual(v, self.cursor)
|
|
|
|
#
|
|
# misc
|
|
#
|
|
|
|
def test_lower_case(self):
|
|
"Ensure pyodbc.lowercase forces returned column names to lowercase."
|
|
|
|
# Has to be set before creating the cursor, so we must recreate self.cursor.
|
|
|
|
pyodbc.lowercase = True
|
|
self.cursor = self.cnxn.cursor()
|
|
|
|
self.cursor.execute("create table t1(Abc int, dEf int)")
|
|
self.cursor.execute("select * from t1")
|
|
|
|
names = [ t[0] for t in self.cursor.description ]
|
|
names.sort()
|
|
|
|
self.assertEqual(names, [ "abc", "def" ])
|
|
|
|
# Put it back so other tests don't fail.
|
|
pyodbc.lowercase = False
|
|
|
|
def test_row_description(self):
|
|
"""
|
|
Ensure Cursor.description is accessible as Row.cursor_description.
|
|
"""
|
|
self.cursor = self.cnxn.cursor()
|
|
self.cursor.execute("create table t1(a int, b char(3))")
|
|
self.cnxn.commit()
|
|
self.cursor.execute("insert into t1 values(1, 'abc')")
|
|
|
|
row = self.cursor.execute("select * from t1").fetchone()
|
|
|
|
self.assertEqual(self.cursor.description, row.cursor_description)
|
|
|
|
|
|
def test_executemany(self):
|
|
self.cursor.execute("create table t1(a int, b varchar(10))")
|
|
|
|
params = [ (i, str(i)) for i in range(1, 6) ]
|
|
|
|
self.cursor.executemany("insert into t1(a, b) values (?,?)", params)
|
|
|
|
count = self.cursor.execute("select count(*) from t1").fetchone()[0]
|
|
self.assertEqual(count, len(params))
|
|
|
|
self.cursor.execute("select a, b from t1 order by a")
|
|
rows = self.cursor.fetchall()
|
|
self.assertEqual(count, len(rows))
|
|
|
|
for param, row in zip(params, rows):
|
|
self.assertEqual(param[0], row[0])
|
|
self.assertEqual(param[1], row[1])
|
|
|
|
|
|
def test_executemany_one(self):
|
|
"Pass executemany a single sequence"
|
|
self.cursor.execute("create table t1(a int, b varchar(10))")
|
|
|
|
params = [ (1, "test") ]
|
|
|
|
self.cursor.executemany("insert into t1(a, b) values (?,?)", params)
|
|
|
|
count = self.cursor.execute("select count(*) from t1").fetchone()[0]
|
|
self.assertEqual(count, len(params))
|
|
|
|
self.cursor.execute("select a, b from t1 order by a")
|
|
rows = self.cursor.fetchall()
|
|
self.assertEqual(count, len(rows))
|
|
|
|
for param, row in zip(params, rows):
|
|
self.assertEqual(param[0], row[0])
|
|
self.assertEqual(param[1], row[1])
|
|
|
|
|
|
def test_executemany_failure(self):
|
|
"""
|
|
Ensure that an exception is raised if one query in an executemany fails.
|
|
"""
|
|
self.cursor.execute("create table t1(a int, b varchar(10))")
|
|
|
|
params = [ (1, 'good'),
|
|
('error', 'not an int'),
|
|
(3, 'good') ]
|
|
|
|
self.assertRaises(pyodbc.Error, self.cursor.executemany, "insert into t1(a, b) value (?, ?)", params)
|
|
|
|
|
|
def test_row_slicing(self):
|
|
self.cursor.execute("create table t1(a int, b int, c int, d int)");
|
|
self.cursor.execute("insert into t1 values(1,2,3,4)")
|
|
|
|
row = self.cursor.execute("select * from t1").fetchone()
|
|
|
|
result = row[:]
|
|
self.assertTrue(result is row)
|
|
|
|
result = row[:-1]
|
|
self.assertEqual(result, (1,2,3))
|
|
|
|
result = row[0:4]
|
|
self.assertTrue(result is row)
|
|
|
|
|
|
def test_row_repr(self):
|
|
self.cursor.execute("create table t1(a int, b int, c int, d int)");
|
|
self.cursor.execute("insert into t1 values(1,2,3,4)")
|
|
|
|
row = self.cursor.execute("select * from t1").fetchone()
|
|
|
|
result = str(row)
|
|
self.assertEqual(result, "(1, 2, 3, 4)")
|
|
|
|
result = str(row[:-1])
|
|
self.assertEqual(result, "(1, 2, 3)")
|
|
|
|
result = str(row[:1])
|
|
self.assertEqual(result, "(1,)")
|
|
|
|
|
|
def test_view_select(self):
|
|
# Reported in forum: Can't select from a view? I think I do this a lot, but another test never hurts.
|
|
|
|
# Create a table (t1) with 3 rows and a view (t2) into it.
|
|
self.cursor.execute("create table t1(c1 int identity(1, 1), c2 varchar(50))")
|
|
for i in range(3):
|
|
self.cursor.execute("insert into t1(c2) values (?)", "string%s" % i)
|
|
self.cursor.execute("create view t2 as select * from t1")
|
|
|
|
# Select from the view
|
|
self.cursor.execute("select * from t2")
|
|
rows = self.cursor.fetchall()
|
|
self.assertTrue(rows is not None)
|
|
self.assertTrue(len(rows) == 3)
|
|
|
|
def test_autocommit(self):
|
|
self.assertEqual(self.cnxn.autocommit, False)
|
|
|
|
othercnxn = pyodbc.connect(self.connection_string, autocommit=True)
|
|
self.assertEqual(othercnxn.autocommit, True)
|
|
|
|
othercnxn.autocommit = False
|
|
self.assertEqual(othercnxn.autocommit, False)
|
|
|
|
def test_skip(self):
|
|
# Insert 1, 2, and 3. Fetch 1, skip 2, fetch 3.
|
|
|
|
self.cursor.execute("create table t1(id int)");
|
|
for i in range(1, 5):
|
|
self.cursor.execute("insert into t1 values(?)", i)
|
|
self.cursor.execute("select id from t1 order by id")
|
|
self.assertEqual(self.cursor.fetchone()[0], 1)
|
|
self.cursor.skip(2)
|
|
self.assertEqual(self.cursor.fetchone()[0], 4)
|
|
|
|
def test_sets_execute(self):
|
|
# Only lists and tuples are allowed.
|
|
def f():
|
|
self.cursor.execute("create table t1 (word varchar (100))")
|
|
words = set (['a'])
|
|
self.cursor.execute("insert into t1 (word) VALUES (?)", [words])
|
|
|
|
self.assertRaises(pyodbc.ProgrammingError, f)
|
|
|
|
def test_sets_executemany(self):
|
|
# Only lists and tuples are allowed.
|
|
def f():
|
|
self.cursor.execute("create table t1 (word varchar (100))")
|
|
words = set (['a'])
|
|
self.cursor.executemany("insert into t1 (word) values (?)", [words])
|
|
|
|
self.assertRaises(TypeError, f)
|
|
|
|
def test_row_execute(self):
|
|
"Ensure we can use a Row object as a parameter to execute"
|
|
self.cursor.execute("create table t1(n int, s varchar(10))")
|
|
self.cursor.execute("insert into t1 values (1, 'a')")
|
|
row = self.cursor.execute("select n, s from t1").fetchone()
|
|
self.assertNotEqual(row, None)
|
|
|
|
self.cursor.execute("create table t2(n int, s varchar(10))")
|
|
self.cursor.execute("insert into t2 values (?, ?)", row)
|
|
|
|
def test_row_executemany(self):
|
|
"Ensure we can use a Row object as a parameter to executemany"
|
|
self.cursor.execute("create table t1(n int, s varchar(10))")
|
|
|
|
for i in range(3):
|
|
self.cursor.execute("insert into t1 values (?, ?)", i, chr(ord('a')+i))
|
|
|
|
rows = self.cursor.execute("select n, s from t1").fetchall()
|
|
self.assertNotEqual(len(rows), 0)
|
|
|
|
self.cursor.execute("create table t2(n int, s varchar(10))")
|
|
self.cursor.executemany("insert into t2 values (?, ?)", rows)
|
|
|
|
def test_description(self):
|
|
"Ensure cursor.description is correct"
|
|
|
|
self.cursor.execute("create table t1(n int, s text)")
|
|
self.cursor.execute("insert into t1 values (1, 'abc')")
|
|
self.cursor.execute("select * from t1")
|
|
|
|
# (I'm not sure the precision of an int is constant across different versions, bits, so I'm hand checking the
|
|
# items I do know.
|
|
|
|
# int
|
|
t = self.cursor.description[0]
|
|
self.assertEqual(t[0], 'n')
|
|
self.assertEqual(t[1], int)
|
|
self.assertEqual(t[5], 0) # scale
|
|
self.assertEqual(t[6], True) # nullable
|
|
|
|
# text
|
|
t = self.cursor.description[1]
|
|
self.assertEqual(t[0], 's')
|
|
self.assertEqual(t[1], str)
|
|
self.assertEqual(t[5], 0) # scale
|
|
self.assertEqual(t[6], True) # nullable
|
|
|
|
def test_row_equal(self):
|
|
self.cursor.execute("create table t1(n int, s varchar(20))")
|
|
self.cursor.execute("insert into t1 values (1, 'test')")
|
|
row1 = self.cursor.execute("select n, s from t1").fetchone()
|
|
row2 = self.cursor.execute("select n, s from t1").fetchone()
|
|
b = (row1 == row2)
|
|
self.assertEqual(b, True)
|
|
|
|
def test_row_gtlt(self):
|
|
self.cursor.execute("create table t1(n int, s varchar(20))")
|
|
self.cursor.execute("insert into t1 values (1, 'test1')")
|
|
self.cursor.execute("insert into t1 values (1, 'test2')")
|
|
rows = self.cursor.execute("select n, s from t1 order by s").fetchall()
|
|
self.assertTrue(rows[0] < rows[1])
|
|
self.assertTrue(rows[0] <= rows[1])
|
|
self.assertTrue(rows[1] > rows[0])
|
|
self.assertTrue(rows[1] >= rows[0])
|
|
self.assertTrue(rows[0] != rows[1])
|
|
|
|
rows = list(rows)
|
|
rows.sort() # uses <
|
|
|
|
def _test_context_manager(self):
|
|
# TODO: This is failing, but it may be due to the design of sqlite. I've disabled it
|
|
# for now until I can research it some more.
|
|
|
|
# WARNING: This isn't working right now. We've set the driver's autocommit to "off",
|
|
# but that doesn't automatically start a transaction. I'm not familiar enough with the
|
|
# internals of the driver to tell what is going on, but it looks like there is support
|
|
# for the autocommit flag.
|
|
#
|
|
# I thought it might be a timing issue, like it not actually starting a txn until you
|
|
# try to do something, but that doesn't seem to work either. I'll leave this in to
|
|
# remind us that it isn't working yet but we need to contact the SQLite ODBC driver
|
|
# author for some guidance.
|
|
|
|
with pyodbc.connect(self.connection_string) as cnxn:
|
|
cursor = cnxn.cursor()
|
|
cursor.execute("begin")
|
|
cursor.execute("create table t1(i int)")
|
|
cursor.execute('rollback')
|
|
|
|
# The connection should be closed now.
|
|
def test():
|
|
cnxn.execute('rollback')
|
|
self.assertRaises(pyodbc.Error, test)
|
|
|
|
def test_untyped_none(self):
|
|
# From issue 129
|
|
value = self.cursor.execute("select ?", None).fetchone()[0]
|
|
self.assertEqual(value, None)
|
|
|
|
def test_large_update_nodata(self):
|
|
self.cursor.execute('create table t1(a blob)')
|
|
hundredkb = 'x'*100*1024
|
|
self.cursor.execute('update t1 set a=? where 1=0', (hundredkb,))
|
|
|
|
def test_no_fetch(self):
|
|
# Issue 89 with FreeTDS: Multiple selects (or catalog functions that issue selects) without fetches seem to
|
|
# confuse the driver.
|
|
self.cursor.execute('select 1')
|
|
self.cursor.execute('select 1')
|
|
self.cursor.execute('select 1')
|
|
|
|
|
|
def main():
|
|
from argparse import ArgumentParser
|
|
parser = ArgumentParser(usage=usage)
|
|
parser.add_argument("-v", "--verbose", action="count", default=0, help="increment test verbosity (can be used multiple times)")
|
|
parser.add_argument("-d", "--debug", action="store_true", default=False, help="print debugging items")
|
|
parser.add_argument("-t", "--test", help="run only the named test")
|
|
parser.add_argument("conn_str", nargs="*", help="connection string for sqlite")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if len(args.conn_str) > 1:
|
|
parser.error('Only one argument is allowed. Do you need quotes around the connection string?')
|
|
|
|
if not args.conn_str:
|
|
connection_string = load_setup_connection_string('sqlitetests')
|
|
|
|
if not connection_string:
|
|
parser.print_help()
|
|
raise SystemExit()
|
|
else:
|
|
connection_string = args.conn_str[0]
|
|
|
|
if args.verbose:
|
|
cnxn = pyodbc.connect(connection_string)
|
|
print_library_info(cnxn)
|
|
cnxn.close()
|
|
|
|
suite = load_tests(SqliteTestCase, args.test, connection_string)
|
|
|
|
testRunner = unittest.TextTestRunner(verbosity=args.verbose)
|
|
result = testRunner.run(suite)
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Add the build directory to the path so we're testing the latest build, not the installed version.
|
|
|
|
add_to_path()
|
|
|
|
import pyodbc
|
|
sys.exit(0 if main().wasSuccessful() else 1)
|