Compare commits

...

No commits in common. "c8-stream-2.7" and "c8-beta-stream-2.7" have entirely different histories.

2 changed files with 478 additions and 1 deletions

View File

@ -0,0 +1,470 @@
Only in psycopg2-2.7.5_patched/: cscope.in.out
Only in psycopg2-2.7.5_patched/: cscope.out
Only in psycopg2-2.7.5_patched/: cscope.po.out
diff -ur psycopg2-2.7.5/psycopg/connection.h psycopg2-2.7.5_patched/psycopg/connection.h
--- psycopg2-2.7.5/psycopg/connection.h 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/connection.h 2023-06-27 14:47:59.000000000 +0200
@@ -108,6 +108,7 @@
* for a green connection. If NULL, the connection is idle. */
PyObject *async_cursor;
int async_status; /* asynchronous execution status */
+ PGresult *pgres;
/* notice processing */
PyObject *notice_list;
diff -ur psycopg2-2.7.5/psycopg/connection_int.c psycopg2-2.7.5_patched/psycopg/connection_int.c
--- psycopg2-2.7.5/psycopg/connection_int.c 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/connection_int.c 2023-07-03 12:51:23.000000000 +0200
@@ -891,11 +891,14 @@
/* Advance to the next state after a call to a pq_is_busy* function */
static int
-_conn_poll_advance_read(connectionObject *self, int busy)
+_conn_poll_advance_read(connectionObject *self)
{
- int res;
+ int res, busy;
Dprintf("conn_poll: poll reading");
+
+ busy = pq_get_result_async(self);
+
switch (busy) {
case 0: /* result is ready */
res = PSYCO_POLL_OK;
@@ -909,7 +912,7 @@
res = PSYCO_POLL_ERROR;
break;
default:
- Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy);
+ Dprintf("conn_poll: unexpected result from pq_get_result_async: %d", busy);
res = PSYCO_POLL_ERROR;
break;
}
@@ -935,21 +938,21 @@
case ASYNC_READ:
Dprintf("conn_poll: async_status = ASYNC_READ");
if (self->async) {
- res = _conn_poll_advance_read(self, pq_is_busy(self));
+ res = _conn_poll_advance_read(self);
}
else {
/* we are a green connection being polled as result of a query.
this means that our caller has the lock and we are being called
from the callback. If we tried to acquire the lock now it would
be a deadlock. */
- res = _conn_poll_advance_read(self, pq_is_busy_locked(self));
+ res = _conn_poll_advance_read(self);
}
break;
case ASYNC_DONE:
Dprintf("conn_poll: async_status = ASYNC_DONE");
/* We haven't asked anything: just check for notifications. */
- res = _conn_poll_advance_read(self, pq_is_busy(self));
+ res = _conn_poll_advance_read(self);
break;
default:
@@ -972,7 +975,6 @@
_conn_poll_setup_async(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
- PGresult *pgres;
switch (self->status) {
case CONN_STATUS_CONNECTING:
@@ -1023,12 +1025,11 @@
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK) {
res = PSYCO_POLL_ERROR;
- pgres = pq_get_last_result(self);
- if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
+ if (self->pgres == NULL || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) {
PyErr_SetString(OperationalError, "can't set datestyle to ISO");
break;
}
- CLEARPGRES(pgres);
+ CLEARPGRES(self->pgres);
Dprintf("conn_poll: status -> CONN_STATUS_READY");
self->status = CONN_STATUS_READY;
@@ -1089,8 +1090,9 @@
}
curs = (cursorObject *)py_curs;
- CLEARPGRES(curs->pgres);
- curs->pgres = pq_get_last_result(self);
+ PQclear(curs->pgres);
+ curs->pgres = self->pgres;
+ self->pgres = NULL;
/* fetch the tuples (if there are any) and build the result. We
* don't care if pq_fetch return 0 or 1, but if there was an error,
diff -ur psycopg2-2.7.5/psycopg/cursor_type.c psycopg2-2.7.5_patched/psycopg/cursor_type.c
--- psycopg2-2.7.5/psycopg/cursor_type.c 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/cursor_type.c 2023-06-27 15:08:58.000000000 +0200
@@ -49,7 +49,6 @@
static PyObject *
psyco_curs_close(cursorObject *self)
{
- EXC_IF_ASYNC_IN_PROGRESS(self, close);
if (self->closed) {
goto exit;
@@ -59,6 +58,8 @@
char buffer[128];
PGTransactionStatusType status;
+ EXC_IF_ASYNC_IN_PROGRESS(self, close_named);
+
if (!self->query) {
Dprintf("skipping named cursor close because unused");
goto close;
diff -ur psycopg2-2.7.5/psycopg/green.c psycopg2-2.7.5_patched/psycopg/green.c
--- psycopg2-2.7.5/psycopg/green.c 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/green.c 2023-06-27 15:10:35.000000000 +0200
@@ -177,10 +177,12 @@
goto end;
}
- /* Now we can read the data without fear of blocking. */
- result = pq_get_last_result(conn);
+ /* the result is now in the connection: take its ownership */
+ result = conn->pgres;
+ conn->pgres = NULL;
end:
+ CLEARPGRES(conn->pgres);
conn->async_status = ASYNC_DONE;
Py_CLEAR(conn->async_cursor);
return result;
diff -ur psycopg2-2.7.5/psycopg/pqpath.c psycopg2-2.7.5_patched/psycopg/pqpath.c
--- psycopg2-2.7.5/psycopg/pqpath.c 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/pqpath.c 2023-07-03 12:39:47.000000000 +0200
@@ -842,80 +842,6 @@
return rv;
}
-
-/* pq_is_busy - consume input and return connection status
-
- a status of 1 means that a call to pq_fetch will block, while a status of 0
- means that there is data available to be collected. -1 means an error, the
- exception will be set accordingly.
-
- this function locks the connection object
- this function call Py_*_ALLOW_THREADS macros */
-
-int
-pq_is_busy(connectionObject *conn)
-{
- int res;
- Dprintf("pq_is_busy: consuming input");
-
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(conn->lock));
-
- if (PQconsumeInput(conn->pgconn) == 0) {
- Dprintf("pq_is_busy: PQconsumeInput() failed");
- pthread_mutex_unlock(&(conn->lock));
- Py_BLOCK_THREADS;
-
- /* if the libpq says pgconn is lost, close the py conn */
- if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
- conn->closed = 2;
- }
-
- PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
- return -1;
- }
-
- res = PQisBusy(conn->pgconn);
-
- Py_BLOCK_THREADS;
- conn_notifies_process(conn);
- conn_notice_process(conn);
- Py_UNBLOCK_THREADS;
-
- pthread_mutex_unlock(&(conn->lock));
- Py_END_ALLOW_THREADS;
-
- return res;
-}
-
-/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock
- *
- * The function should be called with the lock and holding the GIL.
- */
-
-int
-pq_is_busy_locked(connectionObject *conn)
-{
- Dprintf("pq_is_busy_locked: consuming input");
-
- if (PQconsumeInput(conn->pgconn) == 0) {
- Dprintf("pq_is_busy_locked: PQconsumeInput() failed");
-
- /* if the libpq says pgconn is lost, close the py conn */
- if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
- conn->closed = 2;
- }
-
- PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
- return -1;
- }
-
- /* notices and notifies will be processed at the end of the loop we are in
- * (async reading) by pq_fetch. */
-
- return PQisBusy(conn->pgconn);
-}
-
/* pq_flush - flush output and return connection status
a status of 1 means that a some data is still pending to be flushed, while a
@@ -1094,6 +1020,8 @@
Dprintf("pq_send_query: sending ASYNC query:");
Dprintf(" %-.200s", query);
+ CLEARPGRES(conn->pgres);
+
if (0 == (rv = PQsendQuery(conn->pgconn, query))) {
Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn));
}
@@ -1998,3 +1926,90 @@
return ex;
}
+/* pq_get_result_async - read an available result without blocking.
+ *
+ * Return 0 if the result is ready, 1 if it will block, -1 on error.
+ * The last result will be returned in pgres.
+ *
+ * The function should be called with the lock and holding the GIL.
+ */
+
+RAISES_NEG int
+pq_get_result_async(connectionObject *conn)
+{
+ int rv = -1;
+
+ Dprintf("pq_get_result_async: calling PQconsumeInput()");
+ if (PQconsumeInput(conn->pgconn) == 0) {
+ Dprintf("pq_get_result_async: PQconsumeInput() failed");
+
+ /* if the libpq says pgconn is lost, close the py conn */
+ if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
+ conn->closed = 2;
+ }
+
+ PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
+ goto exit;
+ }
+
+ conn_notifies_process(conn);
+ conn_notice_process(conn);
+
+ for (;;) {
+ int busy;
+ PGresult *res;
+ ExecStatusType status;
+
+ Dprintf("pq_get_result_async: calling PQisBusy()");
+ busy = PQisBusy(conn->pgconn);
+
+ if (busy) {
+ /* try later */
+ Dprintf("pq_get_result_async: PQisBusy() = 1");
+ rv = 1;
+ goto exit;
+ }
+
+ if (!(res = PQgetResult(conn->pgconn))) {
+ Dprintf("pq_get_result_async: got no result");
+ /* the result is ready: it was the previously read one */
+ rv = 0;
+ goto exit;
+ }
+
+ status = PQresultStatus(res);
+ Dprintf("pq_get_result_async: got result %s", PQresStatus(status));
+
+ /* Store the result outside because we want to return the last non-null
+ * one and we may have to do it across poll calls. However if there is
+ * an error in the stream of results we want to handle the *first*
+ * error. So don't clobber it with the following ones. */
+ if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) {
+ Dprintf("previous pgres is error: discarding");
+ PQclear(res);
+ }
+ else {
+ PQclear(conn->pgres);
+ conn->pgres = res;
+ }
+
+ switch (status) {
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ case PGRES_COPY_BOTH:
+ /* After entering copy mode, libpq will make a phony
+ * PGresult for us every time we query for it, so we need to
+ * break out of this endless loop. */
+ rv = 0;
+ goto exit;
+
+ default:
+ /* keep on reading to check if there are other results or
+ * we have finished. */
+ continue;
+ }
+ }
+
+exit:
+ return rv;
+}
diff -ur psycopg2-2.7.5/psycopg/pqpath.h psycopg2-2.7.5_patched/psycopg/pqpath.h
--- psycopg2-2.7.5/psycopg/pqpath.h 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/psycopg/pqpath.h 2023-07-03 12:38:19.000000000 +0200
@@ -59,9 +59,8 @@
const char *cmd, const char *tid,
PGresult **pgres, char **error,
PyThreadState **tstate);
-HIDDEN int pq_is_busy(connectionObject *conn);
-HIDDEN int pq_is_busy_locked(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn);
+RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn);
HIDDEN void pq_clear_async(connectionObject *conn);
RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg);
diff -ur psycopg2-2.7.5/tests/test_async.py psycopg2-2.7.5_patched/tests/test_async.py
--- psycopg2-2.7.5/tests/test_async.py 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/tests/test_async.py 2023-06-27 16:18:11.000000000 +0200
@@ -99,7 +99,6 @@
self.assertEquals(cur.fetchone()[0], "a")
@slow
- @skip_before_postgres(8, 2)
def test_async_callproc(self):
cur = self.conn.cursor()
cur.callproc("pg_sleep", (0.1, ))
@@ -406,6 +405,15 @@
cur.execute("delete from table1")
self.wait(cur)
+ def test_stop_on_first_error(self):
+ cur = self.conn.cursor()
+ cur.execute("select 1; select x; select 1/0; select 2")
+ self.assertRaises(psycopg2.ProgrammingError, self.wait, cur)
+
+ cur.execute("select 1")
+ self.wait(cur)
+ self.assertEqual(cur.fetchone(), (1,))
+
def test_error_two_cursors(self):
cur = self.conn.cursor()
cur2 = self.conn.cursor()
@@ -450,6 +458,36 @@
else:
self.fail("no exception raised")
+ @slow
+ def test_non_block_after_notification(self):
+ from select import select
+
+ cur = self.conn.cursor()
+ cur.execute("""
+ select 1;
+ do $$
+ begin
+ raise notice 'hello';
+ end
+ $$ language plpgsql;
+ select pg_sleep(1);
+ """)
+
+ polls = 0
+ while True:
+ state = self.conn.poll()
+ if state == psycopg2.extensions.POLL_OK:
+ break
+ elif state == psycopg2.extensions.POLL_READ:
+ select([self.conn], [], [], 0.1)
+ elif state == psycopg2.extensions.POLL_WRITE:
+ select([], [self.conn], [], 0.1)
+ else:
+ raise Exception("Unexpected result from poll: %r", state)
+ polls += 1
+
+ self.assert_(polls >= 8, polls)
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff -ur psycopg2-2.7.5/tests/test_green.py psycopg2-2.7.5_patched/tests/test_green.py
--- psycopg2-2.7.5/tests/test_green.py 2018-06-17 18:07:41.000000000 +0200
+++ psycopg2-2.7.5_patched/tests/test_green.py 2023-06-27 16:57:21.000000000 +0200
@@ -22,12 +22,14 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
+import select
import unittest
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from testutils import ConnectingTestCase, slow
+from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
class ConnectionStub(object):
@@ -55,10 +57,10 @@
ConnectingTestCase.tearDown(self)
psycopg2.extensions.set_wait_callback(self._cb)
- def set_stub_wait_callback(self, conn):
+ def set_stub_wait_callback(self, conn, cb=None):
stub = ConnectionStub(conn)
psycopg2.extensions.set_wait_callback(
- lambda conn: psycopg2.extras.wait_select(stub))
+ lambda conn: (cb or psycopg2.extras.wait_select)(stub))
return stub
@slow
@@ -111,6 +113,35 @@
curs.execute("select 1")
self.assertEqual(curs.fetchone()[0], 1)
+ @slow
+ def test_non_block_after_notification(self):
+ def wait(conn):
+ while 1:
+ state = conn.poll()
+ if state == POLL_OK:
+ break
+ elif state == POLL_READ:
+ select.select([conn.fileno()], [], [], 0.1)
+ elif state == POLL_WRITE:
+ select.select([], [conn.fileno()], [], 0.1)
+ else:
+ raise conn.OperationalError("bad state from poll: %s" % state)
+
+ stub = self.set_stub_wait_callback(self.conn, wait)
+ cur = self.conn.cursor()
+ cur.execute("""
+ select 1;
+ do $$
+ begin
+ raise notice 'hello';
+ end
+ $$ language plpgsql;
+ select pg_sleep(1);
+ """)
+
+ polls = stub.polls.count(POLL_READ)
+ self.assert_(polls > 8, polls)
+
class CallbackErrorTestCase(ConnectingTestCase):
def setUp(self):

View File

@ -33,7 +33,7 @@ features offered by PostgreSQL.
Summary: %{sum}
Name: python-%{srcname}
Version: 2.7.5
Release: 7%{?dist}
Release: 8%{?dist}
# The exceptions allow linking to OpenSSL and PostgreSQL's libpq
License: LGPLv3+ with exceptions
Group: Applications/Databases
@ -41,6 +41,9 @@ Url: http://www.psycopg.org/psycopg/
Source0: http://www.psycopg.org/psycopg/tarballs/PSYCOPG-2-7/psycopg2-%{version}.tar.gz
# Related to https://bugzilla.redhat.com/show_bug.cgi?id=1909674
Patch1: psycopg2_get_result_async.patch
%{?with_python2:BuildRequires: python2-debug python2-devel}
%if %{with python36_module}
%{?with_python3:BuildRequires: python36-debug python36-devel}
@ -261,6 +264,10 @@ cp -pr ZPsycopgDA/* %{buildroot}%{ZPsycopgDAdir}
%changelog
* Wed Jun 28 2023 Filip Janus <fjanus@redhat.com> - 2.7.5-8
- Added patch for support pq_get_result_async()
- Resolves: #1909674
* Thu Apr 25 2019 Tomas Orsava <torsava@redhat.com> - 2.7.5-7
- Bumping due to problems with modular RPM upgrade path
- Resolves: rhbz#1695587