forked from pool/python-pyodbc
Accepting request 1145945 from home:jayvdb:branches:devel:languages:python
- Add sqlitetests.py & testutils.py - Update to v5.1.0 OBS-URL: https://build.opensuse.org/request/show/1145945 OBS-URL: https://build.opensuse.org/package/show/devel:languages:python/python-pyodbc?expand=0&rev=29
This commit is contained in:
parent
69207657e8
commit
f4b9cb2504
@ -1,3 +0,0 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:e528bb70dd6d6299ee429868925df0866e3e919c772b9eff79c8e17920d8f116
|
||||
size 282412
|
3
pyodbc-5.1.0.tar.gz
Normal file
3
pyodbc-5.1.0.tar.gz
Normal file
@ -0,0 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:397feee44561a6580be08cedbe986436859563f4bb378f48224655c8e987ea60
|
||||
size 115450
|
@ -1,3 +1,17 @@
|
||||
-------------------------------------------------------------------
|
||||
Sun Feb 11 04:15:21 UTC 2024 - John Vandenberg <jayvdb@gmail.com>
|
||||
|
||||
- Add sqlitetests.py & testutils.py
|
||||
- Update to v5.1.0
|
||||
* Mac M1 & M2 binary builds
|
||||
- from v5.0.1
|
||||
* bug fix for 5.0.0 that restores the ability to pass bytes objects
|
||||
in the attrs_before parameter when connecting.
|
||||
This is often used for Azure with a token.
|
||||
- from 5.0.0
|
||||
* The API is backwards compatible.
|
||||
* Dropped support for Python 2.
|
||||
|
||||
-------------------------------------------------------------------
|
||||
Mon Apr 17 19:00:59 UTC 2023 - Dirk Müller <dmueller@suse.com>
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
#
|
||||
# spec file for package python-pyodbc
|
||||
#
|
||||
# Copyright (c) 2023 SUSE LLC
|
||||
# Copyright (c) 2024 SUSE LLC
|
||||
#
|
||||
# All modifications and additions to the file contributed by third parties
|
||||
# remain the property of their copyright owners, unless otherwise agreed
|
||||
@ -17,13 +17,16 @@
|
||||
|
||||
|
||||
Name: python-pyodbc
|
||||
Version: 4.0.39
|
||||
Version: 5.1.0
|
||||
Release: 0
|
||||
Summary: Python ODBC API
|
||||
License: MIT
|
||||
Group: Development/Languages/Python
|
||||
URL: https://github.com/mkleehammer/pyodbc
|
||||
Source: https://files.pythonhosted.org/packages/source/p/pyodbc/pyodbc-%{version}.tar.gz
|
||||
Source1: https://raw.githubusercontent.com/mkleehammer/pyodbc/master/tests/old/sqlitetests.py
|
||||
# testutils is a modified version of https://raw.githubusercontent.com/mkleehammer/pyodbc/44b620d8df1aa71926fb363b140d398bf5f2fc35/tests/testutils.py
|
||||
Source2: testutils.py
|
||||
BuildRequires: %{python_module devel}
|
||||
BuildRequires: %{python_module setuptools}
|
||||
BuildRequires: gcc-c++
|
||||
@ -33,7 +36,7 @@ BuildRequires: unixODBC-devel
|
||||
%python_subpackages
|
||||
|
||||
%description
|
||||
pyodbc is a Python 2.x and 3.x module that allows you to use ODBC
|
||||
pyodbc is a Python 3.x module that allows you to use ODBC
|
||||
to connect to almost any database.
|
||||
|
||||
It implements the Python Database API Specification v2.0, but
|
||||
@ -42,6 +45,7 @@ even more.
|
||||
|
||||
%prep
|
||||
%setup -q -n pyodbc-%{version}
|
||||
cp %{SOURCE1} %{SOURCE2} .
|
||||
|
||||
%build
|
||||
export CFLAGS="%{optflags}"
|
||||
@ -52,8 +56,8 @@ export CFLAGS="%{optflags}"
|
||||
|
||||
%check
|
||||
export PYTHONDONTWRITEBYTECODE=1
|
||||
%{python_expand export PYTHONPATH=%{buildroot}%{$python_sitearch} export TESTDIRSUFFIX=%{$python_bin_suffix}
|
||||
$python tests${TESTDIRSUFFIX::1}/sqlitetests.py -v "Driver=SQLITE3;Database=sqlite.db"
|
||||
%{python_expand export PYTHONPATH=%{buildroot}%{$python_sitearch}:${PWD}
|
||||
$python sqlitetests.py -v "Driver=SQLITE3;Database=sqlite.db"
|
||||
}
|
||||
|
||||
%files %{python_files}
|
||||
|
692
sqlitetests.py
Normal file
692
sqlitetests.py
Normal file
@ -0,0 +1,692 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
usage = """\
|
||||
%(prog)s [options] connection_string
|
||||
|
||||
Unit tests for SQLite using the ODBC driver from http://www.ch-werner.de/sqliteodbc
|
||||
|
||||
To use, pass a connection string as the parameter. The tests will create and
|
||||
drop tables t1 and t2 as necessary. On Windows, use the 32-bit driver with
|
||||
32-bit Python and the 64-bit driver with 64-bit Python (regardless of your
|
||||
operating system bitness).
|
||||
|
||||
These run using the version from the 'build' directory, not the version
|
||||
installed into the Python directories. You must run python setup.py build
|
||||
before running the tests.
|
||||
|
||||
You can also put the connection string into a tmp/setup.cfg file like so:
|
||||
|
||||
[sqlitetests]
|
||||
connection-string=Driver=SQLite3 ODBC Driver;Database=sqlite.db
|
||||
"""
|
||||
|
||||
import sys, os, re
|
||||
import unittest
|
||||
from decimal import Decimal
|
||||
from datetime import datetime, date, time
|
||||
from os.path import join, getsize, dirname, abspath
|
||||
from testutils import *
|
||||
|
||||
_TESTSTR = '0123456789-abcdefghijklmnopqrstuvwxyz-'
|
||||
|
||||
def _generate_test_string(length):
|
||||
"""
|
||||
Returns a string of `length` characters, constructed by repeating _TESTSTR as necessary.
|
||||
|
||||
To enhance performance, there are 3 ways data is read, based on the length of the value, so most data types are
|
||||
tested with 3 lengths. This function helps us generate the test data.
|
||||
|
||||
We use a recognizable data set instead of a single character to make it less likely that "overlap" errors will
|
||||
be hidden and to help us manually identify where a break occurs.
|
||||
"""
|
||||
if length <= len(_TESTSTR):
|
||||
return _TESTSTR[:length]
|
||||
|
||||
c = (length + len(_TESTSTR)-1) // len(_TESTSTR)
|
||||
v = _TESTSTR * c
|
||||
return v[:length]
|
||||
|
||||
class SqliteTestCase(unittest.TestCase):
|
||||
|
||||
SMALL_FENCEPOST_SIZES = [ 0, 1, 255, 256, 510, 511, 512, 1023, 1024, 2047, 2048, 4000 ]
|
||||
LARGE_FENCEPOST_SIZES = [ 4095, 4096, 4097, 10 * 1024, 20 * 1024 ]
|
||||
|
||||
STR_FENCEPOSTS = [ _generate_test_string(size) for size in SMALL_FENCEPOST_SIZES ]
|
||||
BYTE_FENCEPOSTS = [ bytes(s, 'ascii') for s in STR_FENCEPOSTS ]
|
||||
IMAGE_FENCEPOSTS = BYTE_FENCEPOSTS + [ bytes(_generate_test_string(size), 'ascii') for size in LARGE_FENCEPOST_SIZES ]
|
||||
|
||||
def __init__(self, method_name, connection_string):
|
||||
unittest.TestCase.__init__(self, method_name)
|
||||
self.connection_string = connection_string
|
||||
|
||||
def setUp(self):
|
||||
self.cnxn = pyodbc.connect(self.connection_string)
|
||||
self.cursor = self.cnxn.cursor()
|
||||
|
||||
for i in range(3):
|
||||
try:
|
||||
self.cursor.execute("drop table t%d" % i)
|
||||
self.cnxn.commit()
|
||||
except:
|
||||
pass
|
||||
|
||||
self.cnxn.rollback()
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
self.cursor.close()
|
||||
self.cnxn.close()
|
||||
except:
|
||||
# If we've already closed the cursor or connection, exceptions are thrown.
|
||||
pass
|
||||
|
||||
def test_multiple_bindings(self):
|
||||
"More than one bind and select on a cursor"
|
||||
self.cursor.execute("create table t1(n int)")
|
||||
self.cursor.execute("insert into t1 values (?)", 1)
|
||||
self.cursor.execute("insert into t1 values (?)", 2)
|
||||
self.cursor.execute("insert into t1 values (?)", 3)
|
||||
for i in range(3):
|
||||
self.cursor.execute("select n from t1 where n < ?", 10)
|
||||
self.cursor.execute("select n from t1 where n < 3")
|
||||
|
||||
|
||||
def test_different_bindings(self):
|
||||
self.cursor.execute("create table t1(n int)")
|
||||
self.cursor.execute("create table t2(d datetime)")
|
||||
self.cursor.execute("insert into t1 values (?)", 1)
|
||||
self.cursor.execute("insert into t2 values (?)", datetime.now())
|
||||
|
||||
def test_drivers(self):
|
||||
p = pyodbc.drivers()
|
||||
self.assertTrue(isinstance(p, list))
|
||||
|
||||
def test_datasources(self):
|
||||
p = pyodbc.dataSources()
|
||||
self.assertTrue(isinstance(p, dict))
|
||||
|
||||
def test_getinfo_string(self):
|
||||
value = self.cnxn.getinfo(pyodbc.SQL_CATALOG_NAME_SEPARATOR)
|
||||
self.assertTrue(isinstance(value, str))
|
||||
|
||||
def test_getinfo_bool(self):
|
||||
value = self.cnxn.getinfo(pyodbc.SQL_ACCESSIBLE_TABLES)
|
||||
self.assertTrue(isinstance(value, bool))
|
||||
|
||||
def test_getinfo_int(self):
|
||||
value = self.cnxn.getinfo(pyodbc.SQL_DEFAULT_TXN_ISOLATION)
|
||||
self.assertTrue(isinstance(value, int))
|
||||
|
||||
def test_getinfo_smallint(self):
|
||||
value = self.cnxn.getinfo(pyodbc.SQL_CONCAT_NULL_BEHAVIOR)
|
||||
self.assertTrue(isinstance(value, int))
|
||||
|
||||
def _test_strtype(self, sqltype, value, colsize=None):
|
||||
"""
|
||||
The implementation for string, Unicode, and binary tests.
|
||||
"""
|
||||
assert colsize is None or (value is None or colsize >= len(value))
|
||||
|
||||
if colsize:
|
||||
sql = "create table t1(s {}({}))".format(sqltype, colsize)
|
||||
else:
|
||||
sql = "create table t1(s %s)" % sqltype
|
||||
|
||||
self.cursor.execute(sql)
|
||||
self.cursor.execute("insert into t1 values(?)", value)
|
||||
v = self.cursor.execute("select * from t1").fetchone()[0]
|
||||
self.assertEqual(type(v), type(value))
|
||||
|
||||
if value is not None:
|
||||
self.assertEqual(len(v), len(value))
|
||||
|
||||
self.assertEqual(v, value)
|
||||
|
||||
# Reported by Andy Hochhaus in the pyodbc group: In 2.1.7 and earlier, a hardcoded length of 255 was used to
|
||||
# determine whether a parameter was bound as a SQL_VARCHAR or SQL_LONGVARCHAR. Apparently SQL Server chokes if
|
||||
# we bind as a SQL_LONGVARCHAR and the target column size is 8000 or less, which is considers just SQL_VARCHAR.
|
||||
# This means binding a 256 character value would cause problems if compared with a VARCHAR column under
|
||||
# 8001. We now use SQLGetTypeInfo to determine the time to switch.
|
||||
#
|
||||
# [42000] [Microsoft][SQL Server Native Client 10.0][SQL Server]The data types varchar and text are incompatible in the equal to operator.
|
||||
|
||||
self.cursor.execute("select * from t1 where s=?", value)
|
||||
|
||||
|
||||
def _test_strliketype(self, sqltype, value, colsize=None):
|
||||
"""
|
||||
The implementation for text, image, ntext, and binary.
|
||||
|
||||
These types do not support comparison operators.
|
||||
"""
|
||||
assert colsize is None or (value is None or colsize >= len(value))
|
||||
|
||||
if colsize:
|
||||
sql = "create table t1(s {}({}))".format(sqltype, colsize)
|
||||
else:
|
||||
sql = "create table t1(s %s)" % sqltype
|
||||
|
||||
self.cursor.execute(sql)
|
||||
self.cursor.execute("insert into t1 values(?)", value)
|
||||
v = self.cursor.execute("select * from t1").fetchone()[0]
|
||||
self.assertEqual(type(v), type(value))
|
||||
|
||||
if value is not None:
|
||||
self.assertEqual(len(v), len(value))
|
||||
|
||||
self.assertEqual(v, value)
|
||||
|
||||
#
|
||||
# text
|
||||
#
|
||||
|
||||
def test_text_null(self):
|
||||
self._test_strtype('text', None, 100)
|
||||
|
||||
# Generate a test for each fencepost size: test_text_0, etc.
|
||||
def _maketest(value):
|
||||
def t(self):
|
||||
self._test_strtype('text', value, len(value))
|
||||
return t
|
||||
for value in STR_FENCEPOSTS:
|
||||
locals()['test_text_%s' % len(value)] = _maketest(value)
|
||||
|
||||
def test_text_upperlatin(self):
|
||||
self._test_strtype('varchar', 'á')
|
||||
|
||||
#
|
||||
# blob
|
||||
#
|
||||
|
||||
def test_null_blob(self):
|
||||
self._test_strtype('blob', None, 100)
|
||||
|
||||
def test_large_null_blob(self):
|
||||
# Bug 1575064
|
||||
self._test_strtype('blob', None, 4000)
|
||||
|
||||
# Generate a test for each fencepost size: test_unicode_0, etc.
|
||||
def _maketest(value):
|
||||
def t(self):
|
||||
self._test_strtype('blob', value, len(value))
|
||||
return t
|
||||
for value in BYTE_FENCEPOSTS:
|
||||
locals()['test_blob_%s' % len(value)] = _maketest(value)
|
||||
|
||||
def test_subquery_params(self):
|
||||
"""Ensure parameter markers work in a subquery"""
|
||||
self.cursor.execute("create table t1(id integer, s varchar(20))")
|
||||
self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
|
||||
row = self.cursor.execute("""
|
||||
select x.id
|
||||
from (
|
||||
select id
|
||||
from t1
|
||||
where s = ?
|
||||
and id between ? and ?
|
||||
) x
|
||||
""", 'test', 1, 10).fetchone()
|
||||
self.assertNotEqual(row, None)
|
||||
self.assertEqual(row[0], 1)
|
||||
|
||||
def _exec(self):
|
||||
self.cursor.execute(self.sql)
|
||||
|
||||
def test_close_cnxn(self):
|
||||
"""Make sure using a Cursor after closing its connection doesn't crash."""
|
||||
|
||||
self.cursor.execute("create table t1(id integer, s varchar(20))")
|
||||
self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
|
||||
self.cursor.execute("select * from t1")
|
||||
|
||||
self.cnxn.close()
|
||||
|
||||
# Now that the connection is closed, we expect an exception. (If the code attempts to use
|
||||
# the HSTMT, we'll get an access violation instead.)
|
||||
self.sql = "select * from t1"
|
||||
self.assertRaises(pyodbc.ProgrammingError, self._exec)
|
||||
|
||||
def test_negative_row_index(self):
|
||||
self.cursor.execute("create table t1(s varchar(20))")
|
||||
self.cursor.execute("insert into t1 values(?)", "1")
|
||||
row = self.cursor.execute("select * from t1").fetchone()
|
||||
self.assertEqual(row[0], "1")
|
||||
self.assertEqual(row[-1], "1")
|
||||
|
||||
def test_version(self):
|
||||
self.assertEqual(3, len(pyodbc.version.split('.'))) # 1.3.1 etc.
|
||||
|
||||
#
|
||||
# ints and floats
|
||||
#
|
||||
|
||||
def test_int(self):
|
||||
value = 1234
|
||||
self.cursor.execute("create table t1(n int)")
|
||||
self.cursor.execute("insert into t1 values (?)", value)
|
||||
result = self.cursor.execute("select n from t1").fetchone()[0]
|
||||
self.assertEqual(result, value)
|
||||
|
||||
def test_negative_int(self):
|
||||
value = -1
|
||||
self.cursor.execute("create table t1(n int)")
|
||||
self.cursor.execute("insert into t1 values (?)", value)
|
||||
result = self.cursor.execute("select n from t1").fetchone()[0]
|
||||
self.assertEqual(result, value)
|
||||
|
||||
def test_bigint(self):
|
||||
input = 3000000000
|
||||
self.cursor.execute("create table t1(d bigint)")
|
||||
self.cursor.execute("insert into t1 values (?)", input)
|
||||
result = self.cursor.execute("select d from t1").fetchone()[0]
|
||||
self.assertEqual(result, input)
|
||||
|
||||
def test_negative_bigint(self):
|
||||
# Issue 186: BIGINT problem on 32-bit architecture
|
||||
input = -430000000
|
||||
self.cursor.execute("create table t1(d bigint)")
|
||||
self.cursor.execute("insert into t1 values (?)", input)
|
||||
result = self.cursor.execute("select d from t1").fetchone()[0]
|
||||
self.assertEqual(result, input)
|
||||
|
||||
def test_float(self):
|
||||
value = 1234.567
|
||||
self.cursor.execute("create table t1(n float)")
|
||||
self.cursor.execute("insert into t1 values (?)", value)
|
||||
result = self.cursor.execute("select n from t1").fetchone()[0]
|
||||
self.assertEqual(result, value)
|
||||
|
||||
def test_negative_float(self):
|
||||
value = -200
|
||||
self.cursor.execute("create table t1(n float)")
|
||||
self.cursor.execute("insert into t1 values (?)", value)
|
||||
result = self.cursor.execute("select n from t1").fetchone()[0]
|
||||
self.assertEqual(value, result)
|
||||
|
||||
#
|
||||
# rowcount
|
||||
#
|
||||
|
||||
# Note: SQLRowCount does not define what the driver must return after a select statement
|
||||
# and says that its value should not be relied upon. The sqliteodbc driver is hardcoded to
|
||||
# return 0 so I've deleted the test.
|
||||
|
||||
def test_rowcount_delete(self):
|
||||
self.assertEqual(self.cursor.rowcount, -1)
|
||||
self.cursor.execute("create table t1(i int)")
|
||||
count = 4
|
||||
for i in range(count):
|
||||
self.cursor.execute("insert into t1 values (?)", i)
|
||||
self.cursor.execute("delete from t1")
|
||||
self.assertEqual(self.cursor.rowcount, count)
|
||||
|
||||
def test_rowcount_nodata(self):
|
||||
"""
|
||||
This represents a different code path than a delete that deleted something.
|
||||
|
||||
The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over
|
||||
the code that errors out and drop down to the same SQLRowCount code. On the other hand, we could hardcode a
|
||||
zero return value.
|
||||
"""
|
||||
self.cursor.execute("create table t1(i int)")
|
||||
# This is a different code path internally.
|
||||
self.cursor.execute("delete from t1")
|
||||
self.assertEqual(self.cursor.rowcount, 0)
|
||||
|
||||
# In the 2.0.x branch, Cursor.execute sometimes returned the cursor and sometimes the rowcount. This proved very
|
||||
# confusing when things went wrong and added very little value even when things went right since users could always
|
||||
# use: cursor.execute("...").rowcount
|
||||
|
||||
def test_retcursor_delete(self):
|
||||
self.cursor.execute("create table t1(i int)")
|
||||
self.cursor.execute("insert into t1 values (1)")
|
||||
v = self.cursor.execute("delete from t1")
|
||||
self.assertEqual(v, self.cursor)
|
||||
|
||||
def test_retcursor_nodata(self):
|
||||
"""
|
||||
This represents a different code path than a delete that deleted something.
|
||||
|
||||
The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over
|
||||
the code that errors out and drop down to the same SQLRowCount code.
|
||||
"""
|
||||
self.cursor.execute("create table t1(i int)")
|
||||
# This is a different code path internally.
|
||||
v = self.cursor.execute("delete from t1")
|
||||
self.assertEqual(v, self.cursor)
|
||||
|
||||
def test_retcursor_select(self):
|
||||
self.cursor.execute("create table t1(i int)")
|
||||
self.cursor.execute("insert into t1 values (1)")
|
||||
v = self.cursor.execute("select * from t1")
|
||||
self.assertEqual(v, self.cursor)
|
||||
|
||||
#
|
||||
# misc
|
||||
#
|
||||
|
||||
def test_lower_case(self):
|
||||
"Ensure pyodbc.lowercase forces returned column names to lowercase."
|
||||
|
||||
# Has to be set before creating the cursor, so we must recreate self.cursor.
|
||||
|
||||
pyodbc.lowercase = True
|
||||
self.cursor = self.cnxn.cursor()
|
||||
|
||||
self.cursor.execute("create table t1(Abc int, dEf int)")
|
||||
self.cursor.execute("select * from t1")
|
||||
|
||||
names = [ t[0] for t in self.cursor.description ]
|
||||
names.sort()
|
||||
|
||||
self.assertEqual(names, [ "abc", "def" ])
|
||||
|
||||
# Put it back so other tests don't fail.
|
||||
pyodbc.lowercase = False
|
||||
|
||||
def test_row_description(self):
|
||||
"""
|
||||
Ensure Cursor.description is accessible as Row.cursor_description.
|
||||
"""
|
||||
self.cursor = self.cnxn.cursor()
|
||||
self.cursor.execute("create table t1(a int, b char(3))")
|
||||
self.cnxn.commit()
|
||||
self.cursor.execute("insert into t1 values(1, 'abc')")
|
||||
|
||||
row = self.cursor.execute("select * from t1").fetchone()
|
||||
|
||||
self.assertEqual(self.cursor.description, row.cursor_description)
|
||||
|
||||
|
||||
def test_executemany(self):
|
||||
self.cursor.execute("create table t1(a int, b varchar(10))")
|
||||
|
||||
params = [ (i, str(i)) for i in range(1, 6) ]
|
||||
|
||||
self.cursor.executemany("insert into t1(a, b) values (?,?)", params)
|
||||
|
||||
count = self.cursor.execute("select count(*) from t1").fetchone()[0]
|
||||
self.assertEqual(count, len(params))
|
||||
|
||||
self.cursor.execute("select a, b from t1 order by a")
|
||||
rows = self.cursor.fetchall()
|
||||
self.assertEqual(count, len(rows))
|
||||
|
||||
for param, row in zip(params, rows):
|
||||
self.assertEqual(param[0], row[0])
|
||||
self.assertEqual(param[1], row[1])
|
||||
|
||||
|
||||
def test_executemany_one(self):
|
||||
"Pass executemany a single sequence"
|
||||
self.cursor.execute("create table t1(a int, b varchar(10))")
|
||||
|
||||
params = [ (1, "test") ]
|
||||
|
||||
self.cursor.executemany("insert into t1(a, b) values (?,?)", params)
|
||||
|
||||
count = self.cursor.execute("select count(*) from t1").fetchone()[0]
|
||||
self.assertEqual(count, len(params))
|
||||
|
||||
self.cursor.execute("select a, b from t1 order by a")
|
||||
rows = self.cursor.fetchall()
|
||||
self.assertEqual(count, len(rows))
|
||||
|
||||
for param, row in zip(params, rows):
|
||||
self.assertEqual(param[0], row[0])
|
||||
self.assertEqual(param[1], row[1])
|
||||
|
||||
|
||||
def test_executemany_failure(self):
|
||||
"""
|
||||
Ensure that an exception is raised if one query in an executemany fails.
|
||||
"""
|
||||
self.cursor.execute("create table t1(a int, b varchar(10))")
|
||||
|
||||
params = [ (1, 'good'),
|
||||
('error', 'not an int'),
|
||||
(3, 'good') ]
|
||||
|
||||
self.assertRaises(pyodbc.Error, self.cursor.executemany, "insert into t1(a, b) value (?, ?)", params)
|
||||
|
||||
|
||||
def test_row_slicing(self):
|
||||
self.cursor.execute("create table t1(a int, b int, c int, d int)");
|
||||
self.cursor.execute("insert into t1 values(1,2,3,4)")
|
||||
|
||||
row = self.cursor.execute("select * from t1").fetchone()
|
||||
|
||||
result = row[:]
|
||||
self.assertTrue(result is row)
|
||||
|
||||
result = row[:-1]
|
||||
self.assertEqual(result, (1,2,3))
|
||||
|
||||
result = row[0:4]
|
||||
self.assertTrue(result is row)
|
||||
|
||||
|
||||
def test_row_repr(self):
|
||||
self.cursor.execute("create table t1(a int, b int, c int, d int)");
|
||||
self.cursor.execute("insert into t1 values(1,2,3,4)")
|
||||
|
||||
row = self.cursor.execute("select * from t1").fetchone()
|
||||
|
||||
result = str(row)
|
||||
self.assertEqual(result, "(1, 2, 3, 4)")
|
||||
|
||||
result = str(row[:-1])
|
||||
self.assertEqual(result, "(1, 2, 3)")
|
||||
|
||||
result = str(row[:1])
|
||||
self.assertEqual(result, "(1,)")
|
||||
|
||||
|
||||
def test_view_select(self):
|
||||
# Reported in forum: Can't select from a view? I think I do this a lot, but another test never hurts.
|
||||
|
||||
# Create a table (t1) with 3 rows and a view (t2) into it.
|
||||
self.cursor.execute("create table t1(c1 int identity(1, 1), c2 varchar(50))")
|
||||
for i in range(3):
|
||||
self.cursor.execute("insert into t1(c2) values (?)", "string%s" % i)
|
||||
self.cursor.execute("create view t2 as select * from t1")
|
||||
|
||||
# Select from the view
|
||||
self.cursor.execute("select * from t2")
|
||||
rows = self.cursor.fetchall()
|
||||
self.assertTrue(rows is not None)
|
||||
self.assertTrue(len(rows) == 3)
|
||||
|
||||
def test_autocommit(self):
|
||||
self.assertEqual(self.cnxn.autocommit, False)
|
||||
|
||||
othercnxn = pyodbc.connect(self.connection_string, autocommit=True)
|
||||
self.assertEqual(othercnxn.autocommit, True)
|
||||
|
||||
othercnxn.autocommit = False
|
||||
self.assertEqual(othercnxn.autocommit, False)
|
||||
|
||||
def test_skip(self):
|
||||
# Insert 1, 2, and 3. Fetch 1, skip 2, fetch 3.
|
||||
|
||||
self.cursor.execute("create table t1(id int)");
|
||||
for i in range(1, 5):
|
||||
self.cursor.execute("insert into t1 values(?)", i)
|
||||
self.cursor.execute("select id from t1 order by id")
|
||||
self.assertEqual(self.cursor.fetchone()[0], 1)
|
||||
self.cursor.skip(2)
|
||||
self.assertEqual(self.cursor.fetchone()[0], 4)
|
||||
|
||||
def test_sets_execute(self):
|
||||
# Only lists and tuples are allowed.
|
||||
def f():
|
||||
self.cursor.execute("create table t1 (word varchar (100))")
|
||||
words = set (['a'])
|
||||
self.cursor.execute("insert into t1 (word) VALUES (?)", [words])
|
||||
|
||||
self.assertRaises(pyodbc.ProgrammingError, f)
|
||||
|
||||
def test_sets_executemany(self):
|
||||
# Only lists and tuples are allowed.
|
||||
def f():
|
||||
self.cursor.execute("create table t1 (word varchar (100))")
|
||||
words = set (['a'])
|
||||
self.cursor.executemany("insert into t1 (word) values (?)", [words])
|
||||
|
||||
self.assertRaises(TypeError, f)
|
||||
|
||||
def test_row_execute(self):
|
||||
"Ensure we can use a Row object as a parameter to execute"
|
||||
self.cursor.execute("create table t1(n int, s varchar(10))")
|
||||
self.cursor.execute("insert into t1 values (1, 'a')")
|
||||
row = self.cursor.execute("select n, s from t1").fetchone()
|
||||
self.assertNotEqual(row, None)
|
||||
|
||||
self.cursor.execute("create table t2(n int, s varchar(10))")
|
||||
self.cursor.execute("insert into t2 values (?, ?)", row)
|
||||
|
||||
def test_row_executemany(self):
|
||||
"Ensure we can use a Row object as a parameter to executemany"
|
||||
self.cursor.execute("create table t1(n int, s varchar(10))")
|
||||
|
||||
for i in range(3):
|
||||
self.cursor.execute("insert into t1 values (?, ?)", i, chr(ord('a')+i))
|
||||
|
||||
rows = self.cursor.execute("select n, s from t1").fetchall()
|
||||
self.assertNotEqual(len(rows), 0)
|
||||
|
||||
self.cursor.execute("create table t2(n int, s varchar(10))")
|
||||
self.cursor.executemany("insert into t2 values (?, ?)", rows)
|
||||
|
||||
def test_description(self):
|
||||
"Ensure cursor.description is correct"
|
||||
|
||||
self.cursor.execute("create table t1(n int, s text)")
|
||||
self.cursor.execute("insert into t1 values (1, 'abc')")
|
||||
self.cursor.execute("select * from t1")
|
||||
|
||||
# (I'm not sure the precision of an int is constant across different versions, bits, so I'm hand checking the
|
||||
# items I do know.
|
||||
|
||||
# int
|
||||
t = self.cursor.description[0]
|
||||
self.assertEqual(t[0], 'n')
|
||||
self.assertEqual(t[1], int)
|
||||
self.assertEqual(t[5], 0) # scale
|
||||
self.assertEqual(t[6], True) # nullable
|
||||
|
||||
# text
|
||||
t = self.cursor.description[1]
|
||||
self.assertEqual(t[0], 's')
|
||||
self.assertEqual(t[1], str)
|
||||
self.assertEqual(t[5], 0) # scale
|
||||
self.assertEqual(t[6], True) # nullable
|
||||
|
||||
def test_row_equal(self):
|
||||
self.cursor.execute("create table t1(n int, s varchar(20))")
|
||||
self.cursor.execute("insert into t1 values (1, 'test')")
|
||||
row1 = self.cursor.execute("select n, s from t1").fetchone()
|
||||
row2 = self.cursor.execute("select n, s from t1").fetchone()
|
||||
b = (row1 == row2)
|
||||
self.assertEqual(b, True)
|
||||
|
||||
def test_row_gtlt(self):
|
||||
self.cursor.execute("create table t1(n int, s varchar(20))")
|
||||
self.cursor.execute("insert into t1 values (1, 'test1')")
|
||||
self.cursor.execute("insert into t1 values (1, 'test2')")
|
||||
rows = self.cursor.execute("select n, s from t1 order by s").fetchall()
|
||||
self.assertTrue(rows[0] < rows[1])
|
||||
self.assertTrue(rows[0] <= rows[1])
|
||||
self.assertTrue(rows[1] > rows[0])
|
||||
self.assertTrue(rows[1] >= rows[0])
|
||||
self.assertTrue(rows[0] != rows[1])
|
||||
|
||||
rows = list(rows)
|
||||
rows.sort() # uses <
|
||||
|
||||
def _test_context_manager(self):
|
||||
# TODO: This is failing, but it may be due to the design of sqlite. I've disabled it
|
||||
# for now until I can research it some more.
|
||||
|
||||
# WARNING: This isn't working right now. We've set the driver's autocommit to "off",
|
||||
# but that doesn't automatically start a transaction. I'm not familiar enough with the
|
||||
# internals of the driver to tell what is going on, but it looks like there is support
|
||||
# for the autocommit flag.
|
||||
#
|
||||
# I thought it might be a timing issue, like it not actually starting a txn until you
|
||||
# try to do something, but that doesn't seem to work either. I'll leave this in to
|
||||
# remind us that it isn't working yet but we need to contact the SQLite ODBC driver
|
||||
# author for some guidance.
|
||||
|
||||
with pyodbc.connect(self.connection_string) as cnxn:
|
||||
cursor = cnxn.cursor()
|
||||
cursor.execute("begin")
|
||||
cursor.execute("create table t1(i int)")
|
||||
cursor.execute('rollback')
|
||||
|
||||
# The connection should be closed now.
|
||||
def test():
|
||||
cnxn.execute('rollback')
|
||||
self.assertRaises(pyodbc.Error, test)
|
||||
|
||||
def test_untyped_none(self):
|
||||
# From issue 129
|
||||
value = self.cursor.execute("select ?", None).fetchone()[0]
|
||||
self.assertEqual(value, None)
|
||||
|
||||
def test_large_update_nodata(self):
|
||||
self.cursor.execute('create table t1(a blob)')
|
||||
hundredkb = 'x'*100*1024
|
||||
self.cursor.execute('update t1 set a=? where 1=0', (hundredkb,))
|
||||
|
||||
def test_no_fetch(self):
|
||||
# Issue 89 with FreeTDS: Multiple selects (or catalog functions that issue selects) without fetches seem to
|
||||
# confuse the driver.
|
||||
self.cursor.execute('select 1')
|
||||
self.cursor.execute('select 1')
|
||||
self.cursor.execute('select 1')
|
||||
|
||||
|
||||
def main():
|
||||
from argparse import ArgumentParser
|
||||
parser = ArgumentParser(usage=usage)
|
||||
parser.add_argument("-v", "--verbose", action="count", default=0, help="increment test verbosity (can be used multiple times)")
|
||||
parser.add_argument("-d", "--debug", action="store_true", default=False, help="print debugging items")
|
||||
parser.add_argument("-t", "--test", help="run only the named test")
|
||||
parser.add_argument("conn_str", nargs="*", help="connection string for sqlite")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if len(args.conn_str) > 1:
|
||||
parser.error('Only one argument is allowed. Do you need quotes around the connection string?')
|
||||
|
||||
if not args.conn_str:
|
||||
connection_string = load_setup_connection_string('sqlitetests')
|
||||
|
||||
if not connection_string:
|
||||
parser.print_help()
|
||||
raise SystemExit()
|
||||
else:
|
||||
connection_string = args.conn_str[0]
|
||||
|
||||
if args.verbose:
|
||||
cnxn = pyodbc.connect(connection_string)
|
||||
print_library_info(cnxn)
|
||||
cnxn.close()
|
||||
|
||||
suite = load_tests(SqliteTestCase, args.test, connection_string)
|
||||
|
||||
testRunner = unittest.TextTestRunner(verbosity=args.verbose)
|
||||
result = testRunner.run(suite)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Add the build directory to the path so we're testing the latest build, not the installed version.
|
||||
|
||||
add_to_path()
|
||||
|
||||
import pyodbc
|
||||
sys.exit(0 if main().wasSuccessful() else 1)
|
122
testutils.py
Normal file
122
testutils.py
Normal file
@ -0,0 +1,122 @@
|
||||
from datetime import datetime
|
||||
import importlib.machinery
|
||||
import os
|
||||
from os.path import join, dirname, abspath
|
||||
import platform
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
|
||||
def add_to_path():
|
||||
"""
|
||||
Prepends the build directory to the path so that newly built pyodbc libraries are
|
||||
used, allowing it to be tested without pip-installing it.
|
||||
"""
|
||||
# look for the suffixes used in the build filenames, e.g. ".cp38-win_amd64.pyd",
|
||||
# ".cpython-38-darwin.so", ".cpython-38-x86_64-linux-gnu.so", etc.
|
||||
library_exts = [ext for ext in importlib.machinery.EXTENSION_SUFFIXES if ext != '.pyd']
|
||||
# generate the name of the pyodbc build file(s)
|
||||
library_names = ['pyodbc%s' % ext for ext in library_exts]
|
||||
|
||||
# the build directory is assumed to be one directory up from this file
|
||||
build_dir = join(dirname(dirname(abspath(__file__))), 'build')
|
||||
|
||||
# find all the relevant pyodbc build files, and get their modified dates
|
||||
file_info = [
|
||||
(os.path.getmtime(join(dirpath, file)), join(dirpath, file))
|
||||
for dirpath, dirs, files in os.walk(build_dir)
|
||||
for file in files
|
||||
if file in library_names
|
||||
]
|
||||
if file_info:
|
||||
file_info.sort() # put them in chronological order
|
||||
library_modified_dt, library_path = file_info[-1] # use the latest one
|
||||
# add the build directory to the Python path
|
||||
sys.path.insert(0, dirname(library_path))
|
||||
print('Library: {} (last modified {})'.format(library_path, datetime.fromtimestamp(library_modified_dt)))
|
||||
else:
|
||||
print('Did not find the pyodbc library in the build directory. Will use the installed version.')
|
||||
|
||||
|
||||
def print_library_info(cnxn):
|
||||
import pyodbc
|
||||
print('python: %s' % sys.version)
|
||||
print('pyodbc: %s %s' % (pyodbc.version, os.path.abspath(pyodbc.__file__)))
|
||||
print('odbc: %s' % cnxn.getinfo(pyodbc.SQL_ODBC_VER))
|
||||
print('driver: %s %s' % (cnxn.getinfo(pyodbc.SQL_DRIVER_NAME), cnxn.getinfo(pyodbc.SQL_DRIVER_VER)))
|
||||
print(' supports ODBC version %s' % cnxn.getinfo(pyodbc.SQL_DRIVER_ODBC_VER))
|
||||
print('os: %s' % platform.system())
|
||||
print('unicode: SQLWCHAR=%s' % pyodbc.SQLWCHAR_SIZE)
|
||||
|
||||
cursor = cnxn.cursor()
|
||||
for typename in ['VARCHAR', 'WVARCHAR', 'BINARY']:
|
||||
t = getattr(pyodbc, 'SQL_' + typename)
|
||||
cursor.getTypeInfo(t)
|
||||
row = cursor.fetchone()
|
||||
print('Max %s = %s' % (typename, row and row[2] or '(not supported)'))
|
||||
|
||||
if platform.system() == 'Windows':
|
||||
print(' %s' % ' '.join([s for s in platform.win32_ver() if s]))
|
||||
|
||||
|
||||
def discover_and_run(top_level_dir='.', start_dir='.', pattern='test*.py', verbosity=0):
|
||||
"""Finds all the test cases in the start directory and runs them"""
|
||||
tests = unittest.defaultTestLoader.discover(top_level_dir=top_level_dir, start_dir=start_dir, pattern=pattern)
|
||||
runner = unittest.TextTestRunner(verbosity=verbosity)
|
||||
result = runner.run(tests)
|
||||
return result
|
||||
|
||||
|
||||
def load_tests(testclass, name, *args):
|
||||
"""
|
||||
Returns a TestSuite for tests in `testclass`.
|
||||
|
||||
name
|
||||
Optional test name if you only want to run 1 test. If not provided all tests in `testclass` will be loaded.
|
||||
|
||||
args
|
||||
Arguments for the test class constructor. These will be passed after the test method name.
|
||||
"""
|
||||
if name:
|
||||
if not name.startswith('test_'):
|
||||
name = 'test_%s' % name
|
||||
names = [ name ]
|
||||
|
||||
else:
|
||||
names = [ method for method in dir(testclass) if method.startswith('test_') ]
|
||||
|
||||
return unittest.TestSuite([ testclass(name, *args) for name in names ])
|
||||
|
||||
|
||||
def load_setup_connection_string(section):
|
||||
"""
|
||||
Attempts to read the default connection string from the setup.cfg file.
|
||||
|
||||
If the file does not exist or if it exists but does not contain the connection string, None is returned. If the
|
||||
file exists but cannot be parsed, an exception is raised.
|
||||
"""
|
||||
from os.path import exists, join, dirname
|
||||
from configparser import ConfigParser
|
||||
|
||||
FILENAME = 'setup.cfg'
|
||||
KEY = 'connection-string'
|
||||
|
||||
path = dirname(abspath(__file__))
|
||||
while True:
|
||||
fqn = join(path, 'tmp', FILENAME)
|
||||
if exists(fqn):
|
||||
break
|
||||
parent = dirname(path)
|
||||
print('{} --> {}'.format(path, parent))
|
||||
if parent == path:
|
||||
return None
|
||||
path = parent
|
||||
|
||||
try:
|
||||
p = ConfigParser()
|
||||
p.read(fqn)
|
||||
except:
|
||||
raise SystemExit('Unable to parse %s: %s' % (path, sys.exc_info()[1]))
|
||||
|
||||
if p.has_option(section, KEY):
|
||||
return p.get(section, KEY)
|
Loading…
Reference in New Issue
Block a user