diff --git a/pyodbc-4.0.39.tar.gz b/pyodbc-4.0.39.tar.gz deleted file mode 100644 index 8383f9d..0000000 --- a/pyodbc-4.0.39.tar.gz +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:e528bb70dd6d6299ee429868925df0866e3e919c772b9eff79c8e17920d8f116 -size 282412 diff --git a/pyodbc-5.1.0.tar.gz b/pyodbc-5.1.0.tar.gz new file mode 100644 index 0000000..9ba96bb --- /dev/null +++ b/pyodbc-5.1.0.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:397feee44561a6580be08cedbe986436859563f4bb378f48224655c8e987ea60 +size 115450 diff --git a/python-pyodbc.changes b/python-pyodbc.changes index 33872f7..8539870 100644 --- a/python-pyodbc.changes +++ b/python-pyodbc.changes @@ -1,3 +1,17 @@ +------------------------------------------------------------------- +Sun Feb 11 04:15:21 UTC 2024 - John Vandenberg + +- Add sqlitetests.py & testutils.py +- Update to v5.1.0 + * Mac M1 & M2 binary builds +- from v5.0.1 + * bug fix for 5.0.0 that restores the ability to pass bytes objects + in the attrs_before parameter when connecting. + This is often used for Azure with a token. +- from 5.0.0 + * The API is backwards compatible. + * Dropped support for Python 2. + ------------------------------------------------------------------- Mon Apr 17 19:00:59 UTC 2023 - Dirk Müller diff --git a/python-pyodbc.spec b/python-pyodbc.spec index 358a7aa..8b67b8a 100644 --- a/python-pyodbc.spec +++ b/python-pyodbc.spec @@ -1,7 +1,7 @@ # # spec file for package python-pyodbc # -# Copyright (c) 2023 SUSE LLC +# Copyright (c) 2024 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -17,13 +17,16 @@ Name: python-pyodbc -Version: 4.0.39 +Version: 5.1.0 Release: 0 Summary: Python ODBC API License: MIT Group: Development/Languages/Python URL: https://github.com/mkleehammer/pyodbc Source: https://files.pythonhosted.org/packages/source/p/pyodbc/pyodbc-%{version}.tar.gz +Source1: https://raw.githubusercontent.com/mkleehammer/pyodbc/master/tests/old/sqlitetests.py +# testutils is a modified version of https://raw.githubusercontent.com/mkleehammer/pyodbc/44b620d8df1aa71926fb363b140d398bf5f2fc35/tests/testutils.py +Source2: testutils.py BuildRequires: %{python_module devel} BuildRequires: %{python_module setuptools} BuildRequires: gcc-c++ @@ -33,7 +36,7 @@ BuildRequires: unixODBC-devel %python_subpackages %description -pyodbc is a Python 2.x and 3.x module that allows you to use ODBC +pyodbc is a Python 3.x module that allows you to use ODBC to connect to almost any database. It implements the Python Database API Specification v2.0, but @@ -42,6 +45,7 @@ even more. %prep %setup -q -n pyodbc-%{version} +cp %{SOURCE1} %{SOURCE2} . %build export CFLAGS="%{optflags}" @@ -52,8 +56,8 @@ export CFLAGS="%{optflags}" %check export PYTHONDONTWRITEBYTECODE=1 -%{python_expand export PYTHONPATH=%{buildroot}%{$python_sitearch} export TESTDIRSUFFIX=%{$python_bin_suffix} -$python tests${TESTDIRSUFFIX::1}/sqlitetests.py -v "Driver=SQLITE3;Database=sqlite.db" +%{python_expand export PYTHONPATH=%{buildroot}%{$python_sitearch}:${PWD} +$python sqlitetests.py -v "Driver=SQLITE3;Database=sqlite.db" } %files %{python_files} diff --git a/sqlitetests.py b/sqlitetests.py new file mode 100644 index 0000000..4b85571 --- /dev/null +++ b/sqlitetests.py @@ -0,0 +1,692 @@ +#!/usr/bin/python + +usage = """\ +%(prog)s [options] connection_string + +Unit tests for SQLite using the ODBC driver from http://www.ch-werner.de/sqliteodbc + +To use, pass a connection string as the parameter. The tests will create and +drop tables t1 and t2 as necessary. On Windows, use the 32-bit driver with +32-bit Python and the 64-bit driver with 64-bit Python (regardless of your +operating system bitness). + +These run using the version from the 'build' directory, not the version +installed into the Python directories. You must run python setup.py build +before running the tests. + +You can also put the connection string into a tmp/setup.cfg file like so: + + [sqlitetests] + connection-string=Driver=SQLite3 ODBC Driver;Database=sqlite.db +""" + +import sys, os, re +import unittest +from decimal import Decimal +from datetime import datetime, date, time +from os.path import join, getsize, dirname, abspath +from testutils import * + +_TESTSTR = '0123456789-abcdefghijklmnopqrstuvwxyz-' + +def _generate_test_string(length): + """ + Returns a string of `length` characters, constructed by repeating _TESTSTR as necessary. + + To enhance performance, there are 3 ways data is read, based on the length of the value, so most data types are + tested with 3 lengths. This function helps us generate the test data. + + We use a recognizable data set instead of a single character to make it less likely that "overlap" errors will + be hidden and to help us manually identify where a break occurs. + """ + if length <= len(_TESTSTR): + return _TESTSTR[:length] + + c = (length + len(_TESTSTR)-1) // len(_TESTSTR) + v = _TESTSTR * c + return v[:length] + +class SqliteTestCase(unittest.TestCase): + + SMALL_FENCEPOST_SIZES = [ 0, 1, 255, 256, 510, 511, 512, 1023, 1024, 2047, 2048, 4000 ] + LARGE_FENCEPOST_SIZES = [ 4095, 4096, 4097, 10 * 1024, 20 * 1024 ] + + STR_FENCEPOSTS = [ _generate_test_string(size) for size in SMALL_FENCEPOST_SIZES ] + BYTE_FENCEPOSTS = [ bytes(s, 'ascii') for s in STR_FENCEPOSTS ] + IMAGE_FENCEPOSTS = BYTE_FENCEPOSTS + [ bytes(_generate_test_string(size), 'ascii') for size in LARGE_FENCEPOST_SIZES ] + + def __init__(self, method_name, connection_string): + unittest.TestCase.__init__(self, method_name) + self.connection_string = connection_string + + def setUp(self): + self.cnxn = pyodbc.connect(self.connection_string) + self.cursor = self.cnxn.cursor() + + for i in range(3): + try: + self.cursor.execute("drop table t%d" % i) + self.cnxn.commit() + except: + pass + + self.cnxn.rollback() + + def tearDown(self): + try: + self.cursor.close() + self.cnxn.close() + except: + # If we've already closed the cursor or connection, exceptions are thrown. + pass + + def test_multiple_bindings(self): + "More than one bind and select on a cursor" + self.cursor.execute("create table t1(n int)") + self.cursor.execute("insert into t1 values (?)", 1) + self.cursor.execute("insert into t1 values (?)", 2) + self.cursor.execute("insert into t1 values (?)", 3) + for i in range(3): + self.cursor.execute("select n from t1 where n < ?", 10) + self.cursor.execute("select n from t1 where n < 3") + + + def test_different_bindings(self): + self.cursor.execute("create table t1(n int)") + self.cursor.execute("create table t2(d datetime)") + self.cursor.execute("insert into t1 values (?)", 1) + self.cursor.execute("insert into t2 values (?)", datetime.now()) + + def test_drivers(self): + p = pyodbc.drivers() + self.assertTrue(isinstance(p, list)) + + def test_datasources(self): + p = pyodbc.dataSources() + self.assertTrue(isinstance(p, dict)) + + def test_getinfo_string(self): + value = self.cnxn.getinfo(pyodbc.SQL_CATALOG_NAME_SEPARATOR) + self.assertTrue(isinstance(value, str)) + + def test_getinfo_bool(self): + value = self.cnxn.getinfo(pyodbc.SQL_ACCESSIBLE_TABLES) + self.assertTrue(isinstance(value, bool)) + + def test_getinfo_int(self): + value = self.cnxn.getinfo(pyodbc.SQL_DEFAULT_TXN_ISOLATION) + self.assertTrue(isinstance(value, int)) + + def test_getinfo_smallint(self): + value = self.cnxn.getinfo(pyodbc.SQL_CONCAT_NULL_BEHAVIOR) + self.assertTrue(isinstance(value, int)) + + def _test_strtype(self, sqltype, value, colsize=None): + """ + The implementation for string, Unicode, and binary tests. + """ + assert colsize is None or (value is None or colsize >= len(value)) + + if colsize: + sql = "create table t1(s {}({}))".format(sqltype, colsize) + else: + sql = "create table t1(s %s)" % sqltype + + self.cursor.execute(sql) + self.cursor.execute("insert into t1 values(?)", value) + v = self.cursor.execute("select * from t1").fetchone()[0] + self.assertEqual(type(v), type(value)) + + if value is not None: + self.assertEqual(len(v), len(value)) + + self.assertEqual(v, value) + + # Reported by Andy Hochhaus in the pyodbc group: In 2.1.7 and earlier, a hardcoded length of 255 was used to + # determine whether a parameter was bound as a SQL_VARCHAR or SQL_LONGVARCHAR. Apparently SQL Server chokes if + # we bind as a SQL_LONGVARCHAR and the target column size is 8000 or less, which is considers just SQL_VARCHAR. + # This means binding a 256 character value would cause problems if compared with a VARCHAR column under + # 8001. We now use SQLGetTypeInfo to determine the time to switch. + # + # [42000] [Microsoft][SQL Server Native Client 10.0][SQL Server]The data types varchar and text are incompatible in the equal to operator. + + self.cursor.execute("select * from t1 where s=?", value) + + + def _test_strliketype(self, sqltype, value, colsize=None): + """ + The implementation for text, image, ntext, and binary. + + These types do not support comparison operators. + """ + assert colsize is None or (value is None or colsize >= len(value)) + + if colsize: + sql = "create table t1(s {}({}))".format(sqltype, colsize) + else: + sql = "create table t1(s %s)" % sqltype + + self.cursor.execute(sql) + self.cursor.execute("insert into t1 values(?)", value) + v = self.cursor.execute("select * from t1").fetchone()[0] + self.assertEqual(type(v), type(value)) + + if value is not None: + self.assertEqual(len(v), len(value)) + + self.assertEqual(v, value) + + # + # text + # + + def test_text_null(self): + self._test_strtype('text', None, 100) + + # Generate a test for each fencepost size: test_text_0, etc. + def _maketest(value): + def t(self): + self._test_strtype('text', value, len(value)) + return t + for value in STR_FENCEPOSTS: + locals()['test_text_%s' % len(value)] = _maketest(value) + + def test_text_upperlatin(self): + self._test_strtype('varchar', 'á') + + # + # blob + # + + def test_null_blob(self): + self._test_strtype('blob', None, 100) + + def test_large_null_blob(self): + # Bug 1575064 + self._test_strtype('blob', None, 4000) + + # Generate a test for each fencepost size: test_unicode_0, etc. + def _maketest(value): + def t(self): + self._test_strtype('blob', value, len(value)) + return t + for value in BYTE_FENCEPOSTS: + locals()['test_blob_%s' % len(value)] = _maketest(value) + + def test_subquery_params(self): + """Ensure parameter markers work in a subquery""" + self.cursor.execute("create table t1(id integer, s varchar(20))") + self.cursor.execute("insert into t1 values (?,?)", 1, 'test') + row = self.cursor.execute(""" + select x.id + from ( + select id + from t1 + where s = ? + and id between ? and ? + ) x + """, 'test', 1, 10).fetchone() + self.assertNotEqual(row, None) + self.assertEqual(row[0], 1) + + def _exec(self): + self.cursor.execute(self.sql) + + def test_close_cnxn(self): + """Make sure using a Cursor after closing its connection doesn't crash.""" + + self.cursor.execute("create table t1(id integer, s varchar(20))") + self.cursor.execute("insert into t1 values (?,?)", 1, 'test') + self.cursor.execute("select * from t1") + + self.cnxn.close() + + # Now that the connection is closed, we expect an exception. (If the code attempts to use + # the HSTMT, we'll get an access violation instead.) + self.sql = "select * from t1" + self.assertRaises(pyodbc.ProgrammingError, self._exec) + + def test_negative_row_index(self): + self.cursor.execute("create table t1(s varchar(20))") + self.cursor.execute("insert into t1 values(?)", "1") + row = self.cursor.execute("select * from t1").fetchone() + self.assertEqual(row[0], "1") + self.assertEqual(row[-1], "1") + + def test_version(self): + self.assertEqual(3, len(pyodbc.version.split('.'))) # 1.3.1 etc. + + # + # ints and floats + # + + def test_int(self): + value = 1234 + self.cursor.execute("create table t1(n int)") + self.cursor.execute("insert into t1 values (?)", value) + result = self.cursor.execute("select n from t1").fetchone()[0] + self.assertEqual(result, value) + + def test_negative_int(self): + value = -1 + self.cursor.execute("create table t1(n int)") + self.cursor.execute("insert into t1 values (?)", value) + result = self.cursor.execute("select n from t1").fetchone()[0] + self.assertEqual(result, value) + + def test_bigint(self): + input = 3000000000 + self.cursor.execute("create table t1(d bigint)") + self.cursor.execute("insert into t1 values (?)", input) + result = self.cursor.execute("select d from t1").fetchone()[0] + self.assertEqual(result, input) + + def test_negative_bigint(self): + # Issue 186: BIGINT problem on 32-bit architecture + input = -430000000 + self.cursor.execute("create table t1(d bigint)") + self.cursor.execute("insert into t1 values (?)", input) + result = self.cursor.execute("select d from t1").fetchone()[0] + self.assertEqual(result, input) + + def test_float(self): + value = 1234.567 + self.cursor.execute("create table t1(n float)") + self.cursor.execute("insert into t1 values (?)", value) + result = self.cursor.execute("select n from t1").fetchone()[0] + self.assertEqual(result, value) + + def test_negative_float(self): + value = -200 + self.cursor.execute("create table t1(n float)") + self.cursor.execute("insert into t1 values (?)", value) + result = self.cursor.execute("select n from t1").fetchone()[0] + self.assertEqual(value, result) + + # + # rowcount + # + + # Note: SQLRowCount does not define what the driver must return after a select statement + # and says that its value should not be relied upon. The sqliteodbc driver is hardcoded to + # return 0 so I've deleted the test. + + def test_rowcount_delete(self): + self.assertEqual(self.cursor.rowcount, -1) + self.cursor.execute("create table t1(i int)") + count = 4 + for i in range(count): + self.cursor.execute("insert into t1 values (?)", i) + self.cursor.execute("delete from t1") + self.assertEqual(self.cursor.rowcount, count) + + def test_rowcount_nodata(self): + """ + This represents a different code path than a delete that deleted something. + + The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over + the code that errors out and drop down to the same SQLRowCount code. On the other hand, we could hardcode a + zero return value. + """ + self.cursor.execute("create table t1(i int)") + # This is a different code path internally. + self.cursor.execute("delete from t1") + self.assertEqual(self.cursor.rowcount, 0) + + # In the 2.0.x branch, Cursor.execute sometimes returned the cursor and sometimes the rowcount. This proved very + # confusing when things went wrong and added very little value even when things went right since users could always + # use: cursor.execute("...").rowcount + + def test_retcursor_delete(self): + self.cursor.execute("create table t1(i int)") + self.cursor.execute("insert into t1 values (1)") + v = self.cursor.execute("delete from t1") + self.assertEqual(v, self.cursor) + + def test_retcursor_nodata(self): + """ + This represents a different code path than a delete that deleted something. + + The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over + the code that errors out and drop down to the same SQLRowCount code. + """ + self.cursor.execute("create table t1(i int)") + # This is a different code path internally. + v = self.cursor.execute("delete from t1") + self.assertEqual(v, self.cursor) + + def test_retcursor_select(self): + self.cursor.execute("create table t1(i int)") + self.cursor.execute("insert into t1 values (1)") + v = self.cursor.execute("select * from t1") + self.assertEqual(v, self.cursor) + + # + # misc + # + + def test_lower_case(self): + "Ensure pyodbc.lowercase forces returned column names to lowercase." + + # Has to be set before creating the cursor, so we must recreate self.cursor. + + pyodbc.lowercase = True + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1(Abc int, dEf int)") + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ "abc", "def" ]) + + # Put it back so other tests don't fail. + pyodbc.lowercase = False + + def test_row_description(self): + """ + Ensure Cursor.description is accessible as Row.cursor_description. + """ + self.cursor = self.cnxn.cursor() + self.cursor.execute("create table t1(a int, b char(3))") + self.cnxn.commit() + self.cursor.execute("insert into t1 values(1, 'abc')") + + row = self.cursor.execute("select * from t1").fetchone() + + self.assertEqual(self.cursor.description, row.cursor_description) + + + def test_executemany(self): + self.cursor.execute("create table t1(a int, b varchar(10))") + + params = [ (i, str(i)) for i in range(1, 6) ] + + self.cursor.executemany("insert into t1(a, b) values (?,?)", params) + + count = self.cursor.execute("select count(*) from t1").fetchone()[0] + self.assertEqual(count, len(params)) + + self.cursor.execute("select a, b from t1 order by a") + rows = self.cursor.fetchall() + self.assertEqual(count, len(rows)) + + for param, row in zip(params, rows): + self.assertEqual(param[0], row[0]) + self.assertEqual(param[1], row[1]) + + + def test_executemany_one(self): + "Pass executemany a single sequence" + self.cursor.execute("create table t1(a int, b varchar(10))") + + params = [ (1, "test") ] + + self.cursor.executemany("insert into t1(a, b) values (?,?)", params) + + count = self.cursor.execute("select count(*) from t1").fetchone()[0] + self.assertEqual(count, len(params)) + + self.cursor.execute("select a, b from t1 order by a") + rows = self.cursor.fetchall() + self.assertEqual(count, len(rows)) + + for param, row in zip(params, rows): + self.assertEqual(param[0], row[0]) + self.assertEqual(param[1], row[1]) + + + def test_executemany_failure(self): + """ + Ensure that an exception is raised if one query in an executemany fails. + """ + self.cursor.execute("create table t1(a int, b varchar(10))") + + params = [ (1, 'good'), + ('error', 'not an int'), + (3, 'good') ] + + self.assertRaises(pyodbc.Error, self.cursor.executemany, "insert into t1(a, b) value (?, ?)", params) + + + def test_row_slicing(self): + self.cursor.execute("create table t1(a int, b int, c int, d int)"); + self.cursor.execute("insert into t1 values(1,2,3,4)") + + row = self.cursor.execute("select * from t1").fetchone() + + result = row[:] + self.assertTrue(result is row) + + result = row[:-1] + self.assertEqual(result, (1,2,3)) + + result = row[0:4] + self.assertTrue(result is row) + + + def test_row_repr(self): + self.cursor.execute("create table t1(a int, b int, c int, d int)"); + self.cursor.execute("insert into t1 values(1,2,3,4)") + + row = self.cursor.execute("select * from t1").fetchone() + + result = str(row) + self.assertEqual(result, "(1, 2, 3, 4)") + + result = str(row[:-1]) + self.assertEqual(result, "(1, 2, 3)") + + result = str(row[:1]) + self.assertEqual(result, "(1,)") + + + def test_view_select(self): + # Reported in forum: Can't select from a view? I think I do this a lot, but another test never hurts. + + # Create a table (t1) with 3 rows and a view (t2) into it. + self.cursor.execute("create table t1(c1 int identity(1, 1), c2 varchar(50))") + for i in range(3): + self.cursor.execute("insert into t1(c2) values (?)", "string%s" % i) + self.cursor.execute("create view t2 as select * from t1") + + # Select from the view + self.cursor.execute("select * from t2") + rows = self.cursor.fetchall() + self.assertTrue(rows is not None) + self.assertTrue(len(rows) == 3) + + def test_autocommit(self): + self.assertEqual(self.cnxn.autocommit, False) + + othercnxn = pyodbc.connect(self.connection_string, autocommit=True) + self.assertEqual(othercnxn.autocommit, True) + + othercnxn.autocommit = False + self.assertEqual(othercnxn.autocommit, False) + + def test_skip(self): + # Insert 1, 2, and 3. Fetch 1, skip 2, fetch 3. + + self.cursor.execute("create table t1(id int)"); + for i in range(1, 5): + self.cursor.execute("insert into t1 values(?)", i) + self.cursor.execute("select id from t1 order by id") + self.assertEqual(self.cursor.fetchone()[0], 1) + self.cursor.skip(2) + self.assertEqual(self.cursor.fetchone()[0], 4) + + def test_sets_execute(self): + # Only lists and tuples are allowed. + def f(): + self.cursor.execute("create table t1 (word varchar (100))") + words = set (['a']) + self.cursor.execute("insert into t1 (word) VALUES (?)", [words]) + + self.assertRaises(pyodbc.ProgrammingError, f) + + def test_sets_executemany(self): + # Only lists and tuples are allowed. + def f(): + self.cursor.execute("create table t1 (word varchar (100))") + words = set (['a']) + self.cursor.executemany("insert into t1 (word) values (?)", [words]) + + self.assertRaises(TypeError, f) + + def test_row_execute(self): + "Ensure we can use a Row object as a parameter to execute" + self.cursor.execute("create table t1(n int, s varchar(10))") + self.cursor.execute("insert into t1 values (1, 'a')") + row = self.cursor.execute("select n, s from t1").fetchone() + self.assertNotEqual(row, None) + + self.cursor.execute("create table t2(n int, s varchar(10))") + self.cursor.execute("insert into t2 values (?, ?)", row) + + def test_row_executemany(self): + "Ensure we can use a Row object as a parameter to executemany" + self.cursor.execute("create table t1(n int, s varchar(10))") + + for i in range(3): + self.cursor.execute("insert into t1 values (?, ?)", i, chr(ord('a')+i)) + + rows = self.cursor.execute("select n, s from t1").fetchall() + self.assertNotEqual(len(rows), 0) + + self.cursor.execute("create table t2(n int, s varchar(10))") + self.cursor.executemany("insert into t2 values (?, ?)", rows) + + def test_description(self): + "Ensure cursor.description is correct" + + self.cursor.execute("create table t1(n int, s text)") + self.cursor.execute("insert into t1 values (1, 'abc')") + self.cursor.execute("select * from t1") + + # (I'm not sure the precision of an int is constant across different versions, bits, so I'm hand checking the + # items I do know. + + # int + t = self.cursor.description[0] + self.assertEqual(t[0], 'n') + self.assertEqual(t[1], int) + self.assertEqual(t[5], 0) # scale + self.assertEqual(t[6], True) # nullable + + # text + t = self.cursor.description[1] + self.assertEqual(t[0], 's') + self.assertEqual(t[1], str) + self.assertEqual(t[5], 0) # scale + self.assertEqual(t[6], True) # nullable + + def test_row_equal(self): + self.cursor.execute("create table t1(n int, s varchar(20))") + self.cursor.execute("insert into t1 values (1, 'test')") + row1 = self.cursor.execute("select n, s from t1").fetchone() + row2 = self.cursor.execute("select n, s from t1").fetchone() + b = (row1 == row2) + self.assertEqual(b, True) + + def test_row_gtlt(self): + self.cursor.execute("create table t1(n int, s varchar(20))") + self.cursor.execute("insert into t1 values (1, 'test1')") + self.cursor.execute("insert into t1 values (1, 'test2')") + rows = self.cursor.execute("select n, s from t1 order by s").fetchall() + self.assertTrue(rows[0] < rows[1]) + self.assertTrue(rows[0] <= rows[1]) + self.assertTrue(rows[1] > rows[0]) + self.assertTrue(rows[1] >= rows[0]) + self.assertTrue(rows[0] != rows[1]) + + rows = list(rows) + rows.sort() # uses < + + def _test_context_manager(self): + # TODO: This is failing, but it may be due to the design of sqlite. I've disabled it + # for now until I can research it some more. + + # WARNING: This isn't working right now. We've set the driver's autocommit to "off", + # but that doesn't automatically start a transaction. I'm not familiar enough with the + # internals of the driver to tell what is going on, but it looks like there is support + # for the autocommit flag. + # + # I thought it might be a timing issue, like it not actually starting a txn until you + # try to do something, but that doesn't seem to work either. I'll leave this in to + # remind us that it isn't working yet but we need to contact the SQLite ODBC driver + # author for some guidance. + + with pyodbc.connect(self.connection_string) as cnxn: + cursor = cnxn.cursor() + cursor.execute("begin") + cursor.execute("create table t1(i int)") + cursor.execute('rollback') + + # The connection should be closed now. + def test(): + cnxn.execute('rollback') + self.assertRaises(pyodbc.Error, test) + + def test_untyped_none(self): + # From issue 129 + value = self.cursor.execute("select ?", None).fetchone()[0] + self.assertEqual(value, None) + + def test_large_update_nodata(self): + self.cursor.execute('create table t1(a blob)') + hundredkb = 'x'*100*1024 + self.cursor.execute('update t1 set a=? where 1=0', (hundredkb,)) + + def test_no_fetch(self): + # Issue 89 with FreeTDS: Multiple selects (or catalog functions that issue selects) without fetches seem to + # confuse the driver. + self.cursor.execute('select 1') + self.cursor.execute('select 1') + self.cursor.execute('select 1') + + +def main(): + from argparse import ArgumentParser + parser = ArgumentParser(usage=usage) + parser.add_argument("-v", "--verbose", action="count", default=0, help="increment test verbosity (can be used multiple times)") + parser.add_argument("-d", "--debug", action="store_true", default=False, help="print debugging items") + parser.add_argument("-t", "--test", help="run only the named test") + parser.add_argument("conn_str", nargs="*", help="connection string for sqlite") + + args = parser.parse_args() + + if len(args.conn_str) > 1: + parser.error('Only one argument is allowed. Do you need quotes around the connection string?') + + if not args.conn_str: + connection_string = load_setup_connection_string('sqlitetests') + + if not connection_string: + parser.print_help() + raise SystemExit() + else: + connection_string = args.conn_str[0] + + if args.verbose: + cnxn = pyodbc.connect(connection_string) + print_library_info(cnxn) + cnxn.close() + + suite = load_tests(SqliteTestCase, args.test, connection_string) + + testRunner = unittest.TextTestRunner(verbosity=args.verbose) + result = testRunner.run(suite) + + return result + + +if __name__ == '__main__': + + # Add the build directory to the path so we're testing the latest build, not the installed version. + + add_to_path() + + import pyodbc + sys.exit(0 if main().wasSuccessful() else 1) diff --git a/testutils.py b/testutils.py new file mode 100644 index 0000000..1fa9942 --- /dev/null +++ b/testutils.py @@ -0,0 +1,122 @@ +from datetime import datetime +import importlib.machinery +import os +from os.path import join, dirname, abspath +import platform +import sys +import unittest + + +def add_to_path(): + """ + Prepends the build directory to the path so that newly built pyodbc libraries are + used, allowing it to be tested without pip-installing it. + """ + # look for the suffixes used in the build filenames, e.g. ".cp38-win_amd64.pyd", + # ".cpython-38-darwin.so", ".cpython-38-x86_64-linux-gnu.so", etc. + library_exts = [ext for ext in importlib.machinery.EXTENSION_SUFFIXES if ext != '.pyd'] + # generate the name of the pyodbc build file(s) + library_names = ['pyodbc%s' % ext for ext in library_exts] + + # the build directory is assumed to be one directory up from this file + build_dir = join(dirname(dirname(abspath(__file__))), 'build') + + # find all the relevant pyodbc build files, and get their modified dates + file_info = [ + (os.path.getmtime(join(dirpath, file)), join(dirpath, file)) + for dirpath, dirs, files in os.walk(build_dir) + for file in files + if file in library_names + ] + if file_info: + file_info.sort() # put them in chronological order + library_modified_dt, library_path = file_info[-1] # use the latest one + # add the build directory to the Python path + sys.path.insert(0, dirname(library_path)) + print('Library: {} (last modified {})'.format(library_path, datetime.fromtimestamp(library_modified_dt))) + else: + print('Did not find the pyodbc library in the build directory. Will use the installed version.') + + +def print_library_info(cnxn): + import pyodbc + print('python: %s' % sys.version) + print('pyodbc: %s %s' % (pyodbc.version, os.path.abspath(pyodbc.__file__))) + print('odbc: %s' % cnxn.getinfo(pyodbc.SQL_ODBC_VER)) + print('driver: %s %s' % (cnxn.getinfo(pyodbc.SQL_DRIVER_NAME), cnxn.getinfo(pyodbc.SQL_DRIVER_VER))) + print(' supports ODBC version %s' % cnxn.getinfo(pyodbc.SQL_DRIVER_ODBC_VER)) + print('os: %s' % platform.system()) + print('unicode: SQLWCHAR=%s' % pyodbc.SQLWCHAR_SIZE) + + cursor = cnxn.cursor() + for typename in ['VARCHAR', 'WVARCHAR', 'BINARY']: + t = getattr(pyodbc, 'SQL_' + typename) + cursor.getTypeInfo(t) + row = cursor.fetchone() + print('Max %s = %s' % (typename, row and row[2] or '(not supported)')) + + if platform.system() == 'Windows': + print(' %s' % ' '.join([s for s in platform.win32_ver() if s])) + + +def discover_and_run(top_level_dir='.', start_dir='.', pattern='test*.py', verbosity=0): + """Finds all the test cases in the start directory and runs them""" + tests = unittest.defaultTestLoader.discover(top_level_dir=top_level_dir, start_dir=start_dir, pattern=pattern) + runner = unittest.TextTestRunner(verbosity=verbosity) + result = runner.run(tests) + return result + + +def load_tests(testclass, name, *args): + """ + Returns a TestSuite for tests in `testclass`. + + name + Optional test name if you only want to run 1 test. If not provided all tests in `testclass` will be loaded. + + args + Arguments for the test class constructor. These will be passed after the test method name. + """ + if name: + if not name.startswith('test_'): + name = 'test_%s' % name + names = [ name ] + + else: + names = [ method for method in dir(testclass) if method.startswith('test_') ] + + return unittest.TestSuite([ testclass(name, *args) for name in names ]) + + +def load_setup_connection_string(section): + """ + Attempts to read the default connection string from the setup.cfg file. + + If the file does not exist or if it exists but does not contain the connection string, None is returned. If the + file exists but cannot be parsed, an exception is raised. + """ + from os.path import exists, join, dirname + from configparser import ConfigParser + + FILENAME = 'setup.cfg' + KEY = 'connection-string' + + path = dirname(abspath(__file__)) + while True: + fqn = join(path, 'tmp', FILENAME) + if exists(fqn): + break + parent = dirname(path) + print('{} --> {}'.format(path, parent)) + if parent == path: + return None + path = parent + + try: + p = ConfigParser() + p.read(fqn) + except: + raise SystemExit('Unable to parse %s: %s' % (path, sys.exc_info()[1])) + + if p.has_option(section, KEY): + return p.get(section, KEY)