libreoffice/803-connectivity-workben-postgresql.diff

1403 lines
55 KiB
Diff

---
connectivity/workben/postgresql/ddl.py | 185 ++++++++++++
connectivity/workben/postgresql/main.py | 90 ++++++
connectivity/workben/postgresql/makefile.mk | 103 +++++++
connectivity/workben/postgresql/metadata.py | 151 ++++++++++
.../workben/postgresql/preparedstatement.py | 228 +++++++++++++++
connectivity/workben/postgresql/sdbcx.py | 306 ++++++++++++++++++++
connectivity/workben/postgresql/statement.py | 277 ++++++++++++++++++
7 files changed, 1340 insertions(+), 0 deletions(-)
create mode 100644 connectivity/workben/postgresql/ddl.py
create mode 100644 connectivity/workben/postgresql/main.py
create mode 100644 connectivity/workben/postgresql/makefile.mk
create mode 100644 connectivity/workben/postgresql/metadata.py
create mode 100644 connectivity/workben/postgresql/preparedstatement.py
create mode 100644 connectivity/workben/postgresql/sdbcx.py
create mode 100644 connectivity/workben/postgresql/statement.py
diff --git connectivity/workben/postgresql/ddl.py connectivity/workben/postgresql/ddl.py
new file mode 100644
index 0000000..4c1c5e2
--- /dev/null
+++ connectivity/workben/postgresql/ddl.py
@@ -0,0 +1,185 @@
+#*************************************************************************
+#
+# $RCSfile: ddl.py,v $
+#
+# $Revision: 1.1.2.5 $
+#
+# last change: $Author: jbu $ $Date: 2007/01/07 13:50:38 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Ralph Thomas
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Ralph Thomas, Joerg Budischewski
+#
+#*************************************************************************
+from com.sun.star.sdbc import SQLException
+import sys
+
+def dumpResultSet( rs ):
+ meta = rs.getMetaData()
+ for i in range(1, meta.getColumnCount()+1):
+ sys.stdout.write(meta.getColumnName( i ) + "\t")
+ sys.stdout.write( "\n" )
+ while rs.next():
+ for i in range( 1, meta.getColumnCount()+1):
+ sys.stdout.write( rs.getString( i ) + "\t" )
+ sys.stdout.write( "\n" )
+ rs.beforeFirst()
+
+
+
+def executeIgnoringException( stmt, sql ):
+ try:
+ stmt.executeUpdate(sql)
+ except SQLException:
+ pass
+
+def cleanGroupsAndUsers( stmt ):
+ rs = stmt.executeQuery("SELECT groname FROM pg_group WHERE groname LIKE 'pqsdbc_%'" )
+ stmt2 = stmt.getConnection().createStatement()
+ while rs.next():
+ stmt2.executeUpdate("DROP GROUP " + rs.getString(1) )
+
+ rs.close()
+ rs = stmt.executeQuery( "SELECT usename FROM pg_user WHERE usename LIKE 'pqsdbc_%'" )
+ while rs.next():
+ stmt2.executeUpdate( "DROP USER " + rs.getString(1) )
+
+
+
+
+def executeDDLs( connection ):
+
+ stmt = connection.createStatement()
+
+
+ executeIgnoringException( stmt, "DROP VIEW customer2" )
+ executeIgnoringException( stmt, "DROP TABLE orderpos" )
+ executeIgnoringException( stmt, "DROP TABLE ordertab" )
+ executeIgnoringException( stmt, "DROP TABLE product" )
+ executeIgnoringException( stmt, "DROP TABLE customer" )
+ executeIgnoringException( stmt, "DROP TABLE blub" )
+ executeIgnoringException( stmt, "DROP TABLE foo" )
+ executeIgnoringException( stmt, "DROP TABLE nooid" )
+ executeIgnoringException( stmt, "DROP TABLE nooid2" )
+ cleanGroupsAndUsers( stmt )
+ executeIgnoringException( stmt, "DROP DOMAIN pqsdbc_short" )
+ executeIgnoringException( stmt, "DROP DOMAIN pqsdbc_amount" )
+ executeIgnoringException( stmt, "DROP SCHEMA pqsdbc_test" )
+
+ ddls = (
+ "BEGIN",
+ "CREATE DOMAIN pqsdbc_short AS int2",
+ "CREATE DOMAIN pqsdbc_amount AS integer",
+ "CREATE USER pqsdbc_joe",
+ "CREATE USER pqsdbc_susy",
+ "CREATE USER pqsdbc_boss",
+ "CREATE USER pqsdbc_customer", # technical user (e.g. a webfrontend)
+ "CREATE GROUP pqsdbc_employees WITH USER pqsdbc_joe,pqsdbc_susy",
+ "CREATE GROUP pqsdbc_admin WITH USER pqsdbc_susy,pqsdbc_boss",
+ "CREATE SCHEMA pqsdbc_test",
+ "CREATE TABLE customer ( "+
+ "id char(8) UNIQUE PRIMARY KEY, "+
+ "name text, " +
+ "dummySerial serial UNIQUE) WITH OIDS",
+ "COMMENT ON TABLE customer IS 'contains customer attributes'",
+ "COMMENT ON COLUMN customer.id IS 'unique id'",
+ "CREATE TABLE product ("+
+ "id char(8) UNIQUE PRIMARY KEY,"+
+ "name text,"+
+ "price numeric(10,2),"+
+ "image bytea) WITH OIDS",
+
+ "CREATE TABLE ordertab ( "+
+ "id char(8) UNIQUE PRIMARY KEY,"+
+ "customerid char(8) CONSTRAINT cust REFERENCES customer(id) ON DELETE CASCADE ON UPDATE RESTRICT,"+
+ "orderdate char(8),"+
+ "delivered boolean ) WITH OIDS",
+ "CREATE TABLE orderpos ( "+
+ "orderid char(8) REFERENCES ordertab(id),"+
+ "id char(3),"+
+ "productid char(8) REFERENCES product(id),"+
+ "amount pqsdbc_amount,"+
+ "shortamount pqsdbc_short,"+
+ "PRIMARY KEY (orderid,id)) WITH OIDS",
+ "CREATE TABLE nooid ("+
+ "id char(8) UNIQUE PRIMARY KEY,"+
+ "name text) "+
+ "WITHOUT OIDS",
+ "CREATE TABLE nooid2 ("+
+ "id serial UNIQUE PRIMARY KEY,"+
+ "name text) "+
+ "WITHOUT OIDS",
+ "CREATE VIEW customer2 AS SELECT id,name FROM customer",
+ "GRANT SELECT ON TABLE customer,product,orderpos,ordertab TO pqsdbc_customer",
+ "GRANT SELECT ON TABLE product TO GROUP pqsdbc_employees",
+ "GRANT SELECT,UPDATE, INSERT ON TABLE customer TO GROUP pqsdbc_employees",
+ "GRANT ALL ON TABLE orderpos,ordertab TO GROUP pqsdbc_employees, GROUP pqsdbc_admin",
+ "GRANT ALL ON TABLE customer TO GROUP pqsdbc_admin", # the admin is allowed to delete customers
+ "GRANT ALL ON TABLE product TO pqsdbc_boss", # only the boss may change the product table
+ "INSERT INTO public.customer VALUES ('C1','John Doe')",
+ "INSERT INTO \"public\" . \"customer\" VALUES ('C2','Bruce Springsteen')",
+
+ "INSERT INTO \"public\".product VALUES ('PZZ2','Pizza Mista',6.95,'\\003foo\\005')",
+ "INSERT INTO product VALUES ('PZZ5','Pizza Funghi',5.95,'\\001foo\\005')",
+ "INSERT INTO product VALUES ('PAS1','Lasagne',5.49,NULL)",
+
+ "INSERT INTO ordertab VALUES ( '1', 'C2', '20030403','true')",
+ "INSERT INTO ordertab VALUES ( '2', 'C1', '20030402','false')",
+
+ "INSERT INTO orderpos VALUES ( '1','001', 'PZZ2',2,0)",
+ "INSERT INTO orderpos VALUES ( '1','002', 'PZZ5',3,-1)",
+ "INSERT INTO orderpos VALUES ( '2','001', 'PAS1',5,1)",
+ "INSERT INTO orderpos VALUES ( '2','002', 'PZZ2',3,2)",
+ "COMMIT" )
+ for i in ddls:
+ stmt.executeUpdate(i)
+
+ connection.getTables() # force refresh of metadata
+
+ stmt.close()
diff --git connectivity/workben/postgresql/main.py connectivity/workben/postgresql/main.py
new file mode 100644
index 0000000..800dabc
--- /dev/null
+++ connectivity/workben/postgresql/main.py
@@ -0,0 +1,90 @@
+#*************************************************************************
+#
+# $RCSfile: main.py,v $
+#
+# $Revision: 1.1.2.2 $
+#
+# last change: $Author: jbu $ $Date: 2004/05/09 20:04:59 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Joerg Budischewski
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Joerg Budischewski
+#
+#
+#
+#*************************************************************************
+import uno
+import unohelper
+import unittest
+import statement
+import preparedstatement
+import metadata
+import sdbcx
+import sys
+import os
+
+ctx = uno.getComponentContext()
+
+# needed for the tests
+unohelper.addComponentsToContext(
+ ctx,ctx,
+ ("postgresql-sdbc.uno","postgresql-sdbc-impl.uno","typeconverter.uno"),
+ "com.sun.star.loader.SharedLibrary")
+
+runner = unittest.TextTestRunner(sys.stderr,1,2)
+dburl = sys.argv[1] # os.environ['USER'] + "_pqtest"
+print "dburl=" + dburl
+
+suite = unittest.TestSuite()
+suite.addTest(statement.suite(ctx,dburl))
+suite.addTest(preparedstatement.suite(ctx,dburl))
+suite.addTest(metadata.suite(ctx,dburl))
+suite.addTest(sdbcx.suite(ctx,dburl))
+
+runner.run(suite)
diff --git connectivity/workben/postgresql/makefile.mk connectivity/workben/postgresql/makefile.mk
new file mode 100644
index 0000000..82ec2b6
--- /dev/null
+++ connectivity/workben/postgresql/makefile.mk
@@ -0,0 +1,103 @@
+#*************************************************************************
+#
+# $RCSfile: makefile.mk,v $
+#
+# $Revision: 1.1.2.2 $
+#
+# last change: $Author: jbu $ $Date: 2004/05/09 20:04:59 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Sun Microsystems, Inc.
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): _______________________________________
+#
+#
+#
+#*************************************************************************
+
+PRJ=..$/..
+
+PRJNAME=connectivity
+TARGET=postgresql-test
+LIBTARGET=NO
+TARGETTYPE=CUI
+ENABLE_EXCEPTIONS=TRUE
+
+# --- Settings -----------------------------------------------------
+
+.INCLUDE : settings.mk
+.INCLUDE : sv.mk
+# --- Files --------------------------------------------------------
+
+
+PYFILES = \
+ $(DLLDEST)$/statement.py \
+ $(DLLDEST)$/preparedstatement.py \
+ $(DLLDEST)$/main.py \
+ $(DLLDEST)$/ddl.py \
+ $(DLLDEST)$/sdbcx.py \
+ $(DLLDEST)$/metadata.py
+
+ALL : \
+ $(PYFILES) \
+ doc
+
+.INCLUDE : target.mk
+
+$(DLLDEST)$/%.py: %.py
+ +cp $? $@
+
+
+.PHONY doc:
+ @echo "start test with dmake runtest dburl=your-url"
+ @echo " e.g. dmake runtest dburl=sdbc:postgresql:dbname=pqtest"
+ @echo " MUST: Create a separate datbases before (here pqtest),"
+ @echo " (SOME TABLES GET DROPPED)"
+
+runtest : ALL
+ +cd $(DLLDEST) && python main.py "$(dburl)"
diff --git connectivity/workben/postgresql/metadata.py connectivity/workben/postgresql/metadata.py
new file mode 100644
index 0000000..9485675
--- /dev/null
+++ connectivity/workben/postgresql/metadata.py
@@ -0,0 +1,151 @@
+#*************************************************************************
+#
+# $RCSfile: metadata.py,v $
+#
+# $Revision: 1.1.2.4 $
+#
+# last change: $Author: jbu $ $Date: 2006/05/27 11:33:11 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Ralph Thomas
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Ralph Thomas, Joerg Budischewski
+#
+#*************************************************************************
+import unittest
+import sys
+import ddl
+
+from com.sun.star.sdbc.DataType import SMALLINT, INTEGER, BIGINT , DATE, TIME, TIMESTAMP, NUMERIC
+
+def dumpResultSet( rs , count ):
+# for i in range(1, count):
+# sys.stdout.write(meta.getColumnName( i ) + "\t")
+ sys.stdout.write( "\n" )
+ while rs.next():
+ for i in range( 1, count+1):
+ sys.stdout.write( rs.getString( i ) + "\t" )
+ sys.stdout.write( "\n" )
+ rs.beforeFirst()
+
+
+
+
+def suite(ctx,dburl):
+ suite = unittest.TestSuite()
+ suite.addTest(TestCase("testDatabaseMetaData",ctx,dburl))
+ suite.addTest(TestCase("testTypeGuess",ctx,dburl))
+ return suite
+
+class TestCase(unittest.TestCase):
+ def __init__(self,method,ctx,dburl):
+ unittest.TestCase.__init__(self,method)
+ self.ctx = ctx
+ self.dburl = dburl
+
+
+ def setUp( self ):
+ self.driver = self.ctx.ServiceManager.createInstanceWithContext(
+ 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx )
+ self.connection = self.driver.connect( self.dburl, () )
+ ddl.executeDDLs( self.connection )
+
+ def tearDown( self ):
+ self.connection.close()
+
+ def testDatabaseMetaData( self ):
+ meta = self.connection.getMetaData()
+
+ rs = meta.getTables( None, "public", "%", () )
+# dumpResultSet( rs, 5)
+
+ rs = meta.getColumns( None, "%", "customer", "%" )
+# dumpResultSet( rs, 18 )
+
+ rs = meta.getPrimaryKeys( None, "public" , "%" )
+# dumpResultSet( rs , 6 )
+ rs = meta.getTablePrivileges( None, "public" , "%" )
+# dumpResultSet( rs , 7 )
+ rs = meta.getColumns( None, "public" , "customer", "%" )
+# dumpResultSet( rs , 18 )
+ rs = meta.getTypeInfo()
+# dumpResultSet(rs, 18)
+ while rs.next():
+ if rs.getString(1) == "pqsdbc_short":
+ self.failUnless( rs.getInt(2) == SMALLINT )
+ break
+ self.failUnless( not rs.isAfterLast() ) # domain type cannot be found
+
+
+ rs = meta.getIndexInfo( None, "public" , "customer", False, False )
+# dumpResultSet( rs, 13 )
+
+ def testTypeGuess( self ):
+ stmt = self.connection.createStatement()
+ rs = stmt.executeQuery( "SELECT sum(amount) FROM orderpos" )
+ meta = rs.getMetaData()
+ self.failUnless( meta.getColumnType(1) == BIGINT )
+
+ stmt = self.connection.createStatement()
+ rs = stmt.executeQuery( "SELECT sum(price) FROM product" )
+ meta = rs.getMetaData()
+ self.failUnless( meta.getColumnType(1) == NUMERIC )
+
+ rs = stmt.executeQuery( "SELECT max(ttime) FROM firsttable" )
+ meta = rs.getMetaData()
+ self.failUnless( meta.getColumnType(1) == TIME )
+
+ rs = stmt.executeQuery( "SELECT max(tdate) FROM firsttable" )
+ meta = rs.getMetaData()
+ self.failUnless( meta.getColumnType(1) == DATE )
+
+ rs = stmt.executeQuery( "SELECT max(ttimestamp) FROM firsttable" )
+ meta = rs.getMetaData()
+ self.failUnless( meta.getColumnType(1) == TIMESTAMP )
+# rs.next()
+# print rs.getString( 1 )
diff --git connectivity/workben/postgresql/preparedstatement.py connectivity/workben/postgresql/preparedstatement.py
new file mode 100644
index 0000000..d049c9f
--- /dev/null
+++ connectivity/workben/postgresql/preparedstatement.py
@@ -0,0 +1,228 @@
+#*************************************************************************
+#
+# $RCSfile: preparedstatement.py,v $
+#
+# $Revision: 1.1.2.9 $
+#
+# last change: $Author: jbu $ $Date: 2008/07/07 21:37:11 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Ralph Thomas
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Ralph Thomas, Joerg Budischewski
+#
+#*************************************************************************
+import unittest
+import sys
+import ddl
+from uno import ByteSequence
+from com.sun.star.sdbc import SQLException
+from com.sun.star.sdbc.ResultSetConcurrency import UPDATABLE
+from com.sun.star.sdbc.DataType import NUMERIC,VARCHAR
+
+def suite(ctx,dburl):
+ suite = unittest.TestSuite()
+ suite.addTest(TestCase("testQuery",ctx,dburl))
+ suite.addTest(TestCase("testGeneratedResultSet",ctx,dburl))
+ suite.addTest(TestCase("testUpdateableResultSet",ctx,dburl))
+ suite.addTest(TestCase("testQuoteQuote",ctx,dburl))
+ return suite
+
+def realEquals( a,b,eps ):
+ val = a - b
+ if val < 0:
+ val = -1. * val
+ return val < eps
+
+class TestCase(unittest.TestCase):
+ def __init__(self,method,ctx,dburl):
+ unittest.TestCase.__init__(self,method)
+ self.ctx = ctx
+ self.dburl = dburl
+
+ def setUp(self):
+ self.driver = self.ctx.ServiceManager.createInstanceWithContext(
+ 'org.openoffice.comp.connectivity.pq.Driver', self.ctx )
+ self.connection = self.driver.connect( self.dburl, () )
+ ddl.executeDDLs( self.connection )
+
+ def testDown( self ):
+ self.connection.close()
+
+ def testQuery( self ):
+
+ stmts = "SELECT product.id FROM product WHERE product.price > :lowprice AND product.price < :upprice", \
+ "SELECT product.id FROM product WHERE product.price > ? AND product.price < ?" , \
+ "SELECT \"product\".\"id\" FROM product WHERE \"product\".\"price\" > :lowprice AND \"product\".\"price\" < :upprice"
+
+
+ for stmt in stmts:
+ prepstmt = self.connection.prepareStatement( stmt )
+ prepstmt.setDouble( 1, 5.80 )
+ prepstmt.setObjectWithInfo( 2, 7. , NUMERIC, 2)
+ prepstmt.setObjectWithInfo( 2, "7.0000", NUMERIC, 2 )
+ rs = prepstmt.executeQuery( )
+ self.failUnless( rs.getMetaData().getColumnCount() == 1 )
+ self.failUnless( rs.getMetaData().getColumnName(1) == "id")
+ self.failUnless( prepstmt.getMetaData().getColumnCount() == 1 )
+ self.failUnless( prepstmt.getMetaData().getColumnName(1) == "id" )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ).strip() == "PZZ2" )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ).strip() == "PZZ5" )
+ self.failUnless( rs.isLast() )
+
+ prepstmt = self.connection.prepareStatement(
+ "SELECT name FROM product WHERE id = ?" )
+ prepstmt.setString( 1, 'PZZ2' )
+ rs = prepstmt.executeQuery()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ) == "Pizza Mista" )
+ self.failUnless( rs.isLast() )
+
+ prepstmt = self.connection.prepareStatement(
+ "SELECT name FROM product WHERE image = ?" )
+ prepstmt.setBytes( 1, ByteSequence( "\001foo\005" ) )
+ rs = prepstmt.executeQuery()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ) == "Pizza Funghi" )
+ self.failUnless( rs.isLast() )
+
+ prepstmt = self.connection.prepareStatement(
+ "SELECT * FROM ordertab WHERE delivered = ?" )
+ prepstmt.setBoolean( 1 , False )
+ rs = prepstmt.executeQuery()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ).strip() == "2" )
+ self.failUnless( rs.isLast() )
+
+ stmt = self.connection.createStatement()
+ rs = stmt.executeQuery( "SELECT * FROM \"public\".\"customer\"" )
+
+ stmt.executeUpdate( "DELETE FROM product where id='PAS5'" )
+ prepstmt =self.connection.prepareStatement(
+ "INSERT INTO product VALUES(?,'Ravioli',?,NULL)" );
+ prepstmt.setObjectWithInfo( 1, "PAS5" ,VARCHAR,0)
+ prepstmt.setObjectWithInfo( 2, "9.223" ,NUMERIC,2)
+ prepstmt.executeUpdate()
+ rs= stmt.executeQuery( "SELECT price FROM product WHERE id = 'PAS5'" )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ).strip() == "9.22" )
+
+ stmt.executeUpdate( "DELETE FROM product where id='PAS5'" )
+ prepstmt =self.connection.prepareStatement(
+ "INSERT INTO product VALUES('PAS5','Ravioli',?,NULL)" );
+ prepstmt.setObjectWithInfo( 1, 9.223,NUMERIC,2 )
+ prepstmt.executeUpdate()
+ rs= stmt.executeQuery( "SELECT price FROM product WHERE id = 'PAS5'" )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString( 1 ).strip() == "9.22" )
+
+ def testGeneratedResultSet( self ):
+ prepstmt = self.connection.prepareStatement(
+ "INSERT INTO customer VALUES( ?, ? )" )
+ prepstmt.setString( 1, "C3" )
+ prepstmt.setString( 2, "Norah Jones" )
+ prepstmt.executeUpdate()
+ rs = prepstmt.getGeneratedValues()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getInt( 3 ) == 3 )
+
+ prepstmt = self.connection.prepareStatement(
+ "INSERT INTO public.nooid (id,name) VALUES( ?, ? )" )
+ prepstmt.setString( 1, "C3" )
+ prepstmt.setString( 2, "Norah Jones" )
+ prepstmt.executeUpdate()
+ rs = prepstmt.getGeneratedValues()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString(1).rstrip() == "C3" )
+
+ prepstmt = self.connection.prepareStatement(
+ "INSERT INTO public.nooid2 (name) VALUES( ? )" )
+ prepstmt.setString( 1, "Norah Jones" )
+ prepstmt.executeUpdate()
+ rs = prepstmt.getGeneratedValues()
+ self.failUnless( rs )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString(2) == "Norah Jones" )
+ self.failUnless( rs.getString(1) == "1" )
+
+ def testUpdateableResultSet( self ):
+ stmt = self.connection.createStatement()
+ stmt.ResultSetConcurrency = UPDATABLE
+ rs = stmt.executeQuery( "SELECT * FROM orderpos" )
+# ddl.dumpResultSet( rs )
+ rs.next()
+ rs.deleteRow()
+ rs.next()
+ rs.updateInt( 4 , 32 )
+ rs.updateRow()
+
+ rs.moveToInsertRow()
+ rs.updateString( 1 , '2' )
+ rs.updateString( 2, '003' )
+ rs.updateString( 3, 'PZZ5' )
+ rs.updateInt( 4, 22 )
+ rs.insertRow()
+
+ rs = stmt.executeQuery( "SELECT * FROM orderpos" )
+ rs = stmt.executeQuery( "SELECT * FROM \"public\".\"orderpos\"" )
+# ddl.dumpResultSet( rs )
+
+ def testQuoteQuote( self ):
+ stmt = self.connection.prepareStatement( "select 'foo''l'" )
+ rs = stmt.executeQuery()
+ self.failUnless( rs )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString(1) == "foo'l" )
+
+ stmt = self.connection.prepareStatement( "select 'foo''''l'" )
+ rs = stmt.executeQuery()
+ self.failUnless( rs )
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getString(1) == "foo''l" )
diff --git connectivity/workben/postgresql/sdbcx.py connectivity/workben/postgresql/sdbcx.py
new file mode 100644
index 0000000..b864efc
--- /dev/null
+++ connectivity/workben/postgresql/sdbcx.py
@@ -0,0 +1,306 @@
+#*************************************************************************
+#
+# $RCSfile: sdbcx.py,v $
+#
+# $Revision: 1.1.2.6 $
+#
+# last change: $Author: jbu $ $Date: 2007/01/07 13:50:38 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Joerg Budischewski
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Joerg Budischewski
+#
+#
+#
+#*************************************************************************
+import unittest
+import ddl
+import unohelper
+import sys
+from com.sun.star.sdbc import SQLException
+from com.sun.star.sdbc.DataType import VARCHAR, CHAR, DECIMAL, DOUBLE, BIGINT, NUMERIC
+from com.sun.star.sdbc.ColumnValue import NO_NULLS, NULLABLE
+from com.sun.star.sdbcx.KeyType import PRIMARY, FOREIGN, UNIQUE
+from com.sun.star.sdbc.KeyRule import RESTRICT, CASCADE, NO_ACTION
+
+def suite(ctx,dburl):
+ suite = unittest.TestSuite()
+ suite.addTest(TestCase("testTables",ctx,dburl))
+ suite.addTest(TestCase("testViews",ctx,dburl))
+ suite.addTest(TestCase("testKeys",ctx,dburl))
+ suite.addTest(TestCase("testUsers",ctx,dburl))
+ suite.addTest(TestCase("testIndexes",ctx,dburl))
+ return suite
+
+def nullable2Str( v ):
+ if v == NO_NULLS:
+ return "NOT NULL"
+ return ""
+
+def autoIncremtent2Str( v ):
+ if v:
+ return "auto increment"
+ return ""
+
+def dumpColumns( columns ):
+ n = columns.getCount()
+ print "Name\t type\t prec\t scale\t"
+ for i in range( 0, n ):
+ col = columns.getByIndex( i )
+ print col.Name + "\t "+col.TypeName + "\t " + str(col.Precision) + "\t " + str(col.Scale) + "\t "+\
+ str( col.DefaultValue ) + "\t " + str( col.Description ) + "\t " +\
+ autoIncremtent2Str( col.IsAutoIncrement ) + "\t " + \
+ nullable2Str( col.IsNullable )
+
+
+class TestCase(unittest.TestCase):
+ def __init__(self,method,ctx,dburl):
+ unittest.TestCase.__init__(self,method)
+ self.ctx = ctx
+ self.dburl = dburl
+
+ def setUp( self ):
+ self.driver = self.ctx.ServiceManager.createInstanceWithContext(
+ 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx )
+ self.connection = self.driver.connect( self.dburl, () )
+ ddl.executeDDLs( self.connection )
+
+ def tearDown( self ):
+ self.connection.close()
+
+ def checkDescriptor( self, descriptor, name, typeName, type, prec, scale, defaultValue, desc ):
+ self.failUnless( descriptor.Name == name )
+ self.failUnless( descriptor.TypeName == typeName )
+ self.failUnless( descriptor.Type == type )
+ self.failUnless( descriptor.Precision == prec )
+ self.failUnless( descriptor.Scale == scale )
+# print descriptor.DefaultValue + " == " + defaultValue
+# self.failUnless( descriptor.DefaultValue == defaultValue )
+ self.failUnless( descriptor.Description == desc )
+
+
+ def testKeys( self ):
+ dd = self.driver.getDataDefinitionByConnection( self.connection )
+ tables = dd.getTables()
+ t = tables.getByName( "public.ordertab" )
+ keys = t.getKeys()
+ key = keys.getByName( "cust" )
+ self.failUnless( key.Name == "cust" )
+ self.failUnless( key.Type == FOREIGN )
+ self.failUnless( key.ReferencedTable == "public.customer" )
+ self.failUnless( key.UpdateRule == RESTRICT )
+ self.failUnless( key.DeleteRule == CASCADE )
+
+ keycolumns = keys.getByName( "ordertab_pkey" ).getColumns()
+ self.failUnless( keycolumns.getElementNames() == (u"id",) )
+
+ key = keys.getByName( "ordertab_pkey" )
+ self.failUnless( key.Name == "ordertab_pkey" )
+ self.failUnless( key.Type == PRIMARY )
+ self.failUnless( key.UpdateRule == NO_ACTION )
+ self.failUnless( key.DeleteRule == NO_ACTION )
+
+ keys = tables.getByName( "public.customer" ).getKeys()
+ key = keys.getByName( "customer_dummyserial_key" )
+ self.failUnless( key.Name == "customer_dummyserial_key" )
+ self.failUnless( key.Type == UNIQUE )
+ self.failUnless( key.UpdateRule == NO_ACTION )
+ self.failUnless( key.DeleteRule == NO_ACTION )
+
+ keys = tables.getByName( "public.orderpos" ).getKeys()
+ keyEnum = keys.createEnumeration()
+ while keyEnum.hasMoreElements():
+ key = keyEnum.nextElement()
+ cols = key.getColumns()
+ colEnum = cols.createEnumeration()
+ while colEnum.hasMoreElements():
+ col = colEnum.nextElement()
+
+ def testViews( self ):
+ dd = self.driver.getDataDefinitionByConnection( self.connection )
+ views = dd.getViews()
+
+ v = views.getByName( "public.customer2" )
+ self.failUnless( v.Name == "customer2" )
+ self.failUnless( v.SchemaName == "public" )
+ self.failUnless( v.Command != "" )
+
+ def testIndexes( self ):
+ dd = self.driver.getDataDefinitionByConnection( self.connection )
+ tables = dd.getTables()
+ t = tables.getByName( "public.ordertab" )
+ indexes = t.getIndexes()
+ index = indexes.getByName( "ordertab_pkey" )
+
+ self.failUnless( index.Name == "ordertab_pkey" )
+ self.failUnless( index.IsPrimaryKeyIndex )
+ self.failUnless( index.IsUnique )
+ self.failUnless( not index.IsClustered )
+
+ columns = index.getColumns()
+ self.failUnless( columns.hasByName( "id" ) )
+
+ self.failUnless( columns.getByIndex(0).Name == "id" )
+
+ def checkRenameTable( self, t , tables):
+ t.rename( "foo" )
+ self.failUnless( tables.hasByName( "public.foo" ) )
+
+ t.rename( "public.foo2" )
+ self.failUnless( tables.hasByName( "public.foo2" ) )
+
+ try:
+ t.rename( "pqsdbc_test.foo2" )
+ self.failUnless( tables.hasByName( "pqsdbc_test.foo2" ) )
+ print "looks like a server 8.1 or later (changing a schema succeeded)"
+ t.rename( "pqsdbc_test.foo" )
+ self.failUnless( tables.hasByName( "pqsdbc_test.foo" ) )
+ t.rename( "public.foo2" )
+ self.failUnless( tables.hasByName( "public.foo2" ) )
+ except SQLException,e:
+ if e.Message.find( "support changing" ) >= 0:
+ print "looks like a server prior to 8.1 (changing schema failed with Message [" + e.Message.replace("\n", " ") + "])"
+ else:
+ raise e
+ tables.dropByName( "public.foo2" )
+
+ def testTables( self ):
+ dd = self.driver.getDataDefinitionByConnection( self.connection )
+ tables = dd.getTables()
+ t = tables.getByName( "public.customer" )
+ self.failUnless( t.Name == "customer" )
+ self.failUnless( t.SchemaName == "public" )
+ self.failUnless( t.Type == "TABLE" )
+
+ cols = t.getColumns()
+ self.failUnless( cols.hasByName( 'name' ) )
+ self.failUnless( cols.hasByName( 'id' ) )
+ col = cols.getByName( "dummyserial" )
+# dumpColumns( cols )
+ self.checkDescriptor( cols.getByName( "id" ), "id", "bpchar", CHAR, 8, 0, "", "unique id" )
+ self.checkDescriptor( cols.getByName( "name" ), "name", "text", VARCHAR, 0, 0, "", "" )
+
+ dd = cols.createDataDescriptor()
+ dd.Name = "foo"
+ dd.TypeName = "CHAR"
+ dd.Type = CHAR
+ dd.Precision = 25
+ dd.IsNullable = NULLABLE
+ cols.appendByDescriptor( dd )
+
+ dd.Name = "foo2"
+ dd.TypeName = "DECIMAL"
+ dd.Type = DECIMAL
+ dd.Precision = 12
+ dd.Scale = 5
+ dd.DefaultValue = "2.3423"
+ dd.Description = "foo2 description"
+ cols.appendByDescriptor( dd )
+
+ dd.Name = "cash"
+ dd.TypeName = "MONEY"
+ dd.Type = DOUBLE
+# dd.IsNullable = NO_NULLS
+ dd.DefaultValue = "'2.42'"
+ cols.appendByDescriptor( dd )
+
+ cols.refresh()
+
+ self.checkDescriptor( cols.getByName( "foo"), "foo", "bpchar", CHAR, 25,0,"","")
+ self.checkDescriptor(
+ cols.getByName( "foo2"), "foo2", "numeric", NUMERIC, 12,5,"2.3423","foo2 description")
+# dumpColumns( cols )
+
+ datadesc = tables.createDataDescriptor()
+ datadesc.SchemaName = "public"
+ datadesc.Name = "blub"
+ datadesc.Description = "This describes blub"
+
+ tables.appendByDescriptor( datadesc )
+
+ # make the appended descriptors known
+ tables.refresh()
+
+ t = tables.getByName( "public.blub" )
+ self.failUnless( t.Name == "blub" )
+ self.failUnless( t.SchemaName == "public" )
+ self.failUnless( t.Description == "This describes blub" )
+
+ cols = t.getColumns()
+ dd = cols.createDataDescriptor()
+ dd.Name = "mytext"
+ dd.TypeName = "text"
+ dd.Type = VARCHAR
+ dd.IsNullable = NO_NULLS
+ cols.appendByDescriptor( dd )
+
+ cols.refresh()
+
+ dd.DefaultValue = "'myDefault'"
+ dd.Name = "mytext2"
+ dd.IsNullable = NULLABLE
+ dd.Description = "mytext-Description"
+ t.alterColumnByName( "mytext" , dd )
+
+ cols.refresh()
+
+ self.checkDescriptor( cols.getByName( "mytext2" ), "mytext2", "text", VARCHAR, 0,0,"'myDefault'","mytext-Description" )
+
+ t = tables.getByName( "public.customer2" )
+ self.checkRenameTable( t,tables )
+
+ t = tables.getByName( "public.blub" )
+ self.checkRenameTable( t,tables )
+
+
+
+ def testUsers( self ):
+ dd = self.driver.getDataDefinitionByConnection( self.connection )
+ users = dd.getUsers()
+ self.failUnless( "pqsdbc_joe" in users.getElementNames() )
diff --git connectivity/workben/postgresql/statement.py connectivity/workben/postgresql/statement.py
new file mode 100644
index 0000000..b471838
--- /dev/null
+++ connectivity/workben/postgresql/statement.py
@@ -0,0 +1,277 @@
+#*************************************************************************
+#
+# $RCSfile: statement.py,v $
+#
+# $Revision: 1.1.2.5 $
+#
+# last change: $Author: jbu $ $Date: 2006/05/27 11:33:11 $
+#
+# The Contents of this file are made available subject to the terms of
+# either of the following licenses
+#
+# - GNU Lesser General Public License Version 2.1
+# - Sun Industry Standards Source License Version 1.1
+#
+# Sun Microsystems Inc., October, 2000
+#
+# GNU Lesser General Public License Version 2.1
+# =============================================
+# Copyright 2000 by Sun Microsystems, Inc.
+# 901 San Antonio Road, Palo Alto, CA 94303, USA
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
+# MA 02111-1307 USA
+#
+#
+# Sun Industry Standards Source License Version 1.1
+# =================================================
+# The contents of this file are subject to the Sun Industry Standards
+# Source License Version 1.1 (the "License"); You may not use this file
+# except in compliance with the License. You may obtain a copy of the
+# License at http://www.openoffice.org/license.html.
+#
+# Software provided under this License is provided on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
+# WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
+# MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
+# See the License for the specific provisions governing your rights and
+# obligations concerning the Software.
+#
+# The Initial Developer of the Original Code is: Joerg Budischewski
+#
+# Copyright: 2000 by Sun Microsystems, Inc.
+#
+# All Rights Reserved.
+#
+# Contributor(s): Joerg Budischewski
+#
+#
+#
+#*************************************************************************
+import unohelper
+import unittest
+import ddl
+from com.sun.star.sdbc import SQLException, XArray
+from com.sun.star.sdbc.DataType import VARCHAR
+from com.sun.star.util import Date
+from com.sun.star.util import Time
+from com.sun.star.util import DateTime
+
+# todo
+class MyArray( unohelper.Base, XArray ):
+ def __init__( self, data ):
+ self.data = data
+ def getBaseType( self ):
+ return VARCHAR
+ def getBaseTypeName( self ):
+ return "varchar"
+ def getArray( self, foo ):
+ return self.data
+ def getArrayAtIndex( self, startIndex, count, foo ):
+ return self.data[startIndex:startIndex+count-1]
+ def getResultSet( self, foo):
+ return None
+ def getResultSetAtIndex( self, startIndex, count, foo ):
+ return None
+
+def suite(ctx,dburl):
+ suite = unittest.TestSuite()
+ suite.addTest(TestCase("testRobustness",ctx,dburl))
+ suite.addTest(TestCase("testRow",ctx,dburl))
+ suite.addTest(TestCase("testNavigation",ctx,dburl))
+ suite.addTest(TestCase("testDatabaseMetaData",ctx,dburl))
+ suite.addTest(TestCase("testGeneratedResultSet",ctx,dburl))
+ suite.addTest(TestCase("testResultSetMetaData",ctx,dburl))
+ suite.addTest(TestCase("testArray",ctx,dburl))
+ return suite
+
+def realEquals( a,b,eps ):
+ val = a - b
+ if val < 0:
+ val = -1. * val
+ return val < eps
+
+class TestCase(unittest.TestCase):
+ def __init__(self,method,ctx,dburl):
+ unittest.TestCase.__init__(self,method)
+ self.ctx = ctx
+ self.dburl = dburl
+
+ def setUp(self):
+ self.driver = self.ctx.ServiceManager.createInstanceWithContext(
+ 'org.openoffice.comp.connectivity.pq.Driver' , self.ctx )
+ self.connection = self.driver.connect( self.dburl, () )
+ self.stmt = self.connection.createStatement()
+ try:
+ self.stmt.executeUpdate( "DROP TABLE firsttable" )
+ except SQLException,e:
+ pass
+
+ ddls = (
+ "BEGIN",
+ "CREATE TABLE firsttable (tString text,tInteger integer,tShort smallint,tLong bigint,tFloat real,"+
+ "tDouble double precision,tByteSeq bytea,tBool boolean, tDate date, tTime time, tTimestamp timestamp, tIntArray integer[], tStringArray text[], tSerial serial ) WITH OIDS",
+ "INSERT INTO firsttable VALUES ( 'foo', 70000, 12000, 70001, 2.4, 2.45, 'huhu', 'true', '1999-01-08','04:05:06','1999-01-08 04:05:06', '{2,3,4}', '{\"huhu\",\"hi\"}')",
+ "INSERT INTO firsttable VALUES ( 'foo2', 69999, 12001, 70002, -2.4, 2.55, 'huhu', 'false', '1999-01-08','04:05:06','1999-01-08 04:05:06', NULL , '{\"bla\"}' )",
+ "INSERT INTO firsttable VALUES ( 'foo2', 69999, 12001, 70002, -2.4, 2.55, 'huhu', null, '1999-01-08', '04:05:06','1999-01-08 04:05:06', '{}' , '{\"bl ubs\",\"bl\\\\\\\\a}}b\\\\\"a\",\"blub\"}' )",
+ "COMMIT" )
+ for i in ddls:
+ self.stmt.executeUpdate(i)
+
+ def tearDown(self):
+ self.stmt.close()
+ self.connection.close()
+
+ def testRow(self):
+ row = ("foo",70000,12000,70001,2.4,2.45, "huhu", True ,
+ Date(8,1,1999), Time(0,6,5,4),DateTime(0,6,5,4,8,1,1999) )
+ row2 = ("foo2",69999,12001,70002,-2.4,2.55, "huhu", False )
+
+ rs = self.stmt.executeQuery( "SELECT * from firsttable" )
+ self.failUnless( rs.next() )
+
+ self.failUnless( rs.getString(1) == row[0] )
+ self.failUnless( rs.getInt(2) == row[1] )
+ self.failUnless( rs.getShort(3) == row[2] )
+ self.failUnless( rs.getLong(4) == row[3] )
+ self.failUnless( realEquals(rs.getFloat(5), row[4], 0.001))
+ self.failUnless( realEquals(rs.getDouble(6), row[5], 0.00001))
+ self.failUnless( rs.getBytes(7) == row[6] )
+ self.failUnless( rs.getBoolean(8) == row[7] )
+ self.failUnless( rs.getDate(9) == row[8] )
+ self.failUnless( rs.getTime(10) == row[9] )
+ self.failUnless( rs.getTimestamp(11) == row[10] )
+
+ a = rs.getArray(12)
+ data = a.getArray( None )
+ self.failUnless( len( data ) == 3 )
+ self.failUnless( int(data[0] ) == 2 )
+ self.failUnless( int(data[1] ) == 3 )
+ self.failUnless( int(data[2] ) == 4 )
+
+ self.failUnless( rs.next() )
+
+ self.failUnless( rs.next() )
+ data = rs.getArray(13).getArray(None)
+ self.failUnless( data[0] == "bl ubs" )
+ self.failUnless( data[1] == "bl\\a}}b\"a" ) # check special keys
+ self.failUnless( data[2] == "blub" )
+
+ rs.getString(8)
+ self.failUnless( rs.wasNull() )
+ rs.getString(7)
+ self.failUnless( not rs.wasNull() )
+
+ self.failUnless( rs.findColumn( "tShort" ) == 3 )
+ rs.close()
+
+ def testNavigation( self ):
+ rs = self.stmt.executeQuery( "SELECT * from firsttable" )
+ self.failUnless( rs.isBeforeFirst() )
+ self.failUnless( not rs.isAfterLast() )
+ self.failUnless( rs.isBeforeFirst() )
+
+ self.failUnless( rs.next() )
+ self.failUnless( rs.isFirst() )
+ self.failUnless( not rs.isLast() )
+ self.failUnless( not rs.isBeforeFirst() )
+
+ self.failUnless( rs.next() )
+ self.failUnless( rs.next() )
+ self.failUnless( not rs.next() )
+ self.failUnless( rs.isAfterLast() )
+
+ rs.absolute( 1 )
+ self.failUnless( rs.isFirst() )
+
+ rs.absolute( 3 )
+ self.failUnless( rs.isLast() )
+
+ rs.relative( -1 )
+ self.failUnless( rs.getRow() == 2 )
+
+ rs.relative( 1 )
+ self.failUnless( rs.getRow() == 3 )
+
+ rs.close()
+
+ def testRobustness( self ):
+ rs = self.stmt.executeQuery( "SELECT * from firsttable" )
+
+ self.failUnlessRaises( SQLException, rs.getString , 1 )
+
+ rs.next()
+ self.failUnlessRaises( SQLException, rs.getString , 24 )
+ self.failUnlessRaises( SQLException, rs.getString , 0 )
+
+ self.connection.close()
+ self.failUnlessRaises( SQLException, rs.getString , 1 )
+ self.failUnlessRaises( SQLException, self.stmt.executeQuery, "SELECT * from firsttable" )
+ rs.close()
+
+
+ def testDatabaseMetaData( self ):
+ meta = self.connection.getMetaData()
+
+ self.failUnless( not meta.isReadOnly() )
+
+ def testGeneratedResultSet( self ):
+ self.stmt.executeUpdate(
+ "INSERT INTO firsttable VALUES ( 'foo3', 69998, 12001, 70002, -2.4, 2.55, 'huhu2')" )
+ #ddl.dumpResultSet( self.stmt.getGeneratedValues() )
+ rs = self.stmt.getGeneratedValues()
+ self.failUnless( rs.next() )
+ self.failUnless( rs.getInt( 14 ) == 4 )
+
+ def testResultSetMetaData( self ):
+ rs = self.stmt.executeQuery( "SELECT * from firsttable" )
+
+ # just check, if we get results !
+ meta = rs.getMetaData()
+
+ count = meta.getColumnCount()
+ for i in range( 1, count+1):
+ meta.isNullable( i )
+ meta.isCurrency( i )
+ meta.isCaseSensitive( i )
+ meta.isSearchable( i )
+ meta.isSigned( i )
+ meta.getColumnDisplaySize( i )
+ meta.getColumnName( i )
+ meta.getColumnLabel( i )
+ meta.getSchemaName( i )
+ meta.getPrecision( i )
+ meta.getScale( i )
+ meta.getTableName( i )
+ meta.getColumnTypeName( i )
+ meta.getColumnType( i )
+ meta.isReadOnly( i )
+ meta.isWritable( i )
+ meta.isDefinitelyWritable( i )
+ meta.getColumnServiceName( i )
+
+ def testArray( self ):
+ stmt = self.connection.prepareStatement(
+ "INSERT INTO firsttable VALUES ( 'insertedArray', 70000, 12000, 70001, 2.4, 2.45, 'huhu', 'true', '1999-01-08','04:05:06','1999-01-08 04:05:06', '{2,3,4}', ? )" )
+ myarray = ( "a", "\"c", "}d{" )
+ stmt.setArray( 1, MyArray( myarray ) )
+ stmt.executeUpdate()
+
+ stmt = self.connection.createStatement()
+ rs = stmt.executeQuery( "SELECT tStringArray FROM firsttable WHERE tString = 'insertedArray'" )
+ rs.next()
+ data = rs.getArray(1).getArray(None)
+ self.failUnless( data[0] == myarray[0] )
+ self.failUnless( data[1] == myarray[1] )
+ self.failUnless( data[2] == myarray[2] )
--
1.7.0.1