Only in psycopg2-2.7.5_patched/: cscope.in.out Only in psycopg2-2.7.5_patched/: cscope.out Only in psycopg2-2.7.5_patched/: cscope.po.out diff -ur psycopg2-2.7.5/psycopg/connection.h psycopg2-2.7.5_patched/psycopg/connection.h --- psycopg2-2.7.5/psycopg/connection.h 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/connection.h 2023-06-27 14:47:59.000000000 +0200 @@ -108,6 +108,7 @@ * for a green connection. If NULL, the connection is idle. */ PyObject *async_cursor; int async_status; /* asynchronous execution status */ + PGresult *pgres; /* notice processing */ PyObject *notice_list; diff -ur psycopg2-2.7.5/psycopg/connection_int.c psycopg2-2.7.5_patched/psycopg/connection_int.c --- psycopg2-2.7.5/psycopg/connection_int.c 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/connection_int.c 2023-07-03 12:51:23.000000000 +0200 @@ -891,11 +891,14 @@ /* Advance to the next state after a call to a pq_is_busy* function */ static int -_conn_poll_advance_read(connectionObject *self, int busy) +_conn_poll_advance_read(connectionObject *self) { - int res; + int res, busy; Dprintf("conn_poll: poll reading"); + + busy = pq_get_result_async(self); + switch (busy) { case 0: /* result is ready */ res = PSYCO_POLL_OK; @@ -909,7 +912,7 @@ res = PSYCO_POLL_ERROR; break; default: - Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); + Dprintf("conn_poll: unexpected result from pq_get_result_async: %d", busy); res = PSYCO_POLL_ERROR; break; } @@ -935,21 +938,21 @@ case ASYNC_READ: Dprintf("conn_poll: async_status = ASYNC_READ"); if (self->async) { - res = _conn_poll_advance_read(self, pq_is_busy(self)); + res = _conn_poll_advance_read(self); } else { /* we are a green connection being polled as result of a query. this means that our caller has the lock and we are being called from the callback. If we tried to acquire the lock now it would be a deadlock. */ - res = _conn_poll_advance_read(self, pq_is_busy_locked(self)); + res = _conn_poll_advance_read(self); } break; case ASYNC_DONE: Dprintf("conn_poll: async_status = ASYNC_DONE"); /* We haven't asked anything: just check for notifications. */ - res = _conn_poll_advance_read(self, pq_is_busy(self)); + res = _conn_poll_advance_read(self); break; default: @@ -972,7 +975,6 @@ _conn_poll_setup_async(connectionObject *self) { int res = PSYCO_POLL_ERROR; - PGresult *pgres; switch (self->status) { case CONN_STATUS_CONNECTING: @@ -1023,12 +1025,11 @@ res = _conn_poll_query(self); if (res == PSYCO_POLL_OK) { res = PSYCO_POLL_ERROR; - pgres = pq_get_last_result(self); - if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + if (self->pgres == NULL || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) { PyErr_SetString(OperationalError, "can't set datestyle to ISO"); break; } - CLEARPGRES(pgres); + CLEARPGRES(self->pgres); Dprintf("conn_poll: status -> CONN_STATUS_READY"); self->status = CONN_STATUS_READY; @@ -1089,8 +1090,9 @@ } curs = (cursorObject *)py_curs; - CLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); + PQclear(curs->pgres); + curs->pgres = self->pgres; + self->pgres = NULL; /* fetch the tuples (if there are any) and build the result. We * don't care if pq_fetch return 0 or 1, but if there was an error, diff -ur psycopg2-2.7.5/psycopg/cursor_type.c psycopg2-2.7.5_patched/psycopg/cursor_type.c --- psycopg2-2.7.5/psycopg/cursor_type.c 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/cursor_type.c 2023-06-27 15:08:58.000000000 +0200 @@ -49,7 +49,6 @@ static PyObject * psyco_curs_close(cursorObject *self) { - EXC_IF_ASYNC_IN_PROGRESS(self, close); if (self->closed) { goto exit; @@ -59,6 +58,8 @@ char buffer[128]; PGTransactionStatusType status; + EXC_IF_ASYNC_IN_PROGRESS(self, close_named); + if (!self->query) { Dprintf("skipping named cursor close because unused"); goto close; diff -ur psycopg2-2.7.5/psycopg/green.c psycopg2-2.7.5_patched/psycopg/green.c --- psycopg2-2.7.5/psycopg/green.c 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/green.c 2023-06-27 15:10:35.000000000 +0200 @@ -177,10 +177,12 @@ goto end; } - /* Now we can read the data without fear of blocking. */ - result = pq_get_last_result(conn); + /* the result is now in the connection: take its ownership */ + result = conn->pgres; + conn->pgres = NULL; end: + CLEARPGRES(conn->pgres); conn->async_status = ASYNC_DONE; Py_CLEAR(conn->async_cursor); return result; diff -ur psycopg2-2.7.5/psycopg/pqpath.c psycopg2-2.7.5_patched/psycopg/pqpath.c --- psycopg2-2.7.5/psycopg/pqpath.c 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/pqpath.c 2023-07-03 12:39:47.000000000 +0200 @@ -842,80 +842,6 @@ return rv; } - -/* pq_is_busy - consume input and return connection status - - a status of 1 means that a call to pq_fetch will block, while a status of 0 - means that there is data available to be collected. -1 means an error, the - exception will be set accordingly. - - this function locks the connection object - this function call Py_*_ALLOW_THREADS macros */ - -int -pq_is_busy(connectionObject *conn) -{ - int res; - Dprintf("pq_is_busy: consuming input"); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(conn->lock)); - - if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy: PQconsumeInput() failed"); - pthread_mutex_unlock(&(conn->lock)); - Py_BLOCK_THREADS; - - /* if the libpq says pgconn is lost, close the py conn */ - if (CONNECTION_BAD == PQstatus(conn->pgconn)) { - conn->closed = 2; - } - - PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; - } - - res = PQisBusy(conn->pgconn); - - Py_BLOCK_THREADS; - conn_notifies_process(conn); - conn_notice_process(conn); - Py_UNBLOCK_THREADS; - - pthread_mutex_unlock(&(conn->lock)); - Py_END_ALLOW_THREADS; - - return res; -} - -/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock - * - * The function should be called with the lock and holding the GIL. - */ - -int -pq_is_busy_locked(connectionObject *conn) -{ - Dprintf("pq_is_busy_locked: consuming input"); - - if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy_locked: PQconsumeInput() failed"); - - /* if the libpq says pgconn is lost, close the py conn */ - if (CONNECTION_BAD == PQstatus(conn->pgconn)) { - conn->closed = 2; - } - - PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; - } - - /* notices and notifies will be processed at the end of the loop we are in - * (async reading) by pq_fetch. */ - - return PQisBusy(conn->pgconn); -} - /* pq_flush - flush output and return connection status a status of 1 means that a some data is still pending to be flushed, while a @@ -1094,6 +1020,8 @@ Dprintf("pq_send_query: sending ASYNC query:"); Dprintf(" %-.200s", query); + CLEARPGRES(conn->pgres); + if (0 == (rv = PQsendQuery(conn->pgconn, query))) { Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); } @@ -1998,3 +1926,90 @@ return ex; } +/* pq_get_result_async - read an available result without blocking. + * + * Return 0 if the result is ready, 1 if it will block, -1 on error. + * The last result will be returned in pgres. + * + * The function should be called with the lock and holding the GIL. + */ + +RAISES_NEG int +pq_get_result_async(connectionObject *conn) +{ + int rv = -1; + + Dprintf("pq_get_result_async: calling PQconsumeInput()"); + if (PQconsumeInput(conn->pgconn) == 0) { + Dprintf("pq_get_result_async: PQconsumeInput() failed"); + + /* if the libpq says pgconn is lost, close the py conn */ + if (CONNECTION_BAD == PQstatus(conn->pgconn)) { + conn->closed = 2; + } + + PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); + goto exit; + } + + conn_notifies_process(conn); + conn_notice_process(conn); + + for (;;) { + int busy; + PGresult *res; + ExecStatusType status; + + Dprintf("pq_get_result_async: calling PQisBusy()"); + busy = PQisBusy(conn->pgconn); + + if (busy) { + /* try later */ + Dprintf("pq_get_result_async: PQisBusy() = 1"); + rv = 1; + goto exit; + } + + if (!(res = PQgetResult(conn->pgconn))) { + Dprintf("pq_get_result_async: got no result"); + /* the result is ready: it was the previously read one */ + rv = 0; + goto exit; + } + + status = PQresultStatus(res); + Dprintf("pq_get_result_async: got result %s", PQresStatus(status)); + + /* Store the result outside because we want to return the last non-null + * one and we may have to do it across poll calls. However if there is + * an error in the stream of results we want to handle the *first* + * error. So don't clobber it with the following ones. */ + if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) { + Dprintf("previous pgres is error: discarding"); + PQclear(res); + } + else { + PQclear(conn->pgres); + conn->pgres = res; + } + + switch (status) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + /* After entering copy mode, libpq will make a phony + * PGresult for us every time we query for it, so we need to + * break out of this endless loop. */ + rv = 0; + goto exit; + + default: + /* keep on reading to check if there are other results or + * we have finished. */ + continue; + } + } + +exit: + return rv; +} diff -ur psycopg2-2.7.5/psycopg/pqpath.h psycopg2-2.7.5_patched/psycopg/pqpath.h --- psycopg2-2.7.5/psycopg/pqpath.h 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/psycopg/pqpath.h 2023-07-03 12:38:19.000000000 +0200 @@ -59,9 +59,8 @@ const char *cmd, const char *tid, PGresult **pgres, char **error, PyThreadState **tstate); -HIDDEN int pq_is_busy(connectionObject *conn); -HIDDEN int pq_is_busy_locked(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); +RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg); diff -ur psycopg2-2.7.5/tests/test_async.py psycopg2-2.7.5_patched/tests/test_async.py --- psycopg2-2.7.5/tests/test_async.py 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/tests/test_async.py 2023-06-27 16:18:11.000000000 +0200 @@ -99,7 +99,6 @@ self.assertEquals(cur.fetchone()[0], "a") @slow - @skip_before_postgres(8, 2) def test_async_callproc(self): cur = self.conn.cursor() cur.callproc("pg_sleep", (0.1, )) @@ -406,6 +405,15 @@ cur.execute("delete from table1") self.wait(cur) + def test_stop_on_first_error(self): + cur = self.conn.cursor() + cur.execute("select 1; select x; select 1/0; select 2") + self.assertRaises(psycopg2.ProgrammingError, self.wait, cur) + + cur.execute("select 1") + self.wait(cur) + self.assertEqual(cur.fetchone(), (1,)) + def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() @@ -450,6 +458,36 @@ else: self.fail("no exception raised") + @slow + def test_non_block_after_notification(self): + from select import select + + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = 0 + while True: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select([self.conn], [], [], 0.1) + elif state == psycopg2.extensions.POLL_WRITE: + select([], [self.conn], [], 0.1) + else: + raise Exception("Unexpected result from poll: %r", state) + polls += 1 + + self.assert_(polls >= 8, polls) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff -ur psycopg2-2.7.5/tests/test_green.py psycopg2-2.7.5_patched/tests/test_green.py --- psycopg2-2.7.5/tests/test_green.py 2018-06-17 18:07:41.000000000 +0200 +++ psycopg2-2.7.5_patched/tests/test_green.py 2023-06-27 16:57:21.000000000 +0200 @@ -22,12 +22,14 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import select import unittest import psycopg2 import psycopg2.extensions import psycopg2.extras from testutils import ConnectingTestCase, slow +from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE class ConnectionStub(object): @@ -55,10 +57,10 @@ ConnectingTestCase.tearDown(self) psycopg2.extensions.set_wait_callback(self._cb) - def set_stub_wait_callback(self, conn): + def set_stub_wait_callback(self, conn, cb=None): stub = ConnectionStub(conn) psycopg2.extensions.set_wait_callback( - lambda conn: psycopg2.extras.wait_select(stub)) + lambda conn: (cb or psycopg2.extras.wait_select)(stub)) return stub @slow @@ -111,6 +113,35 @@ curs.execute("select 1") self.assertEqual(curs.fetchone()[0], 1) + @slow + def test_non_block_after_notification(self): + def wait(conn): + while 1: + state = conn.poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], [], 0.1) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], [], 0.1) + else: + raise conn.OperationalError("bad state from poll: %s" % state) + + stub = self.set_stub_wait_callback(self.conn, wait) + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = stub.polls.count(POLL_READ) + self.assert_(polls > 8, polls) + class CallbackErrorTestCase(ConnectingTestCase): def setUp(self):