Psycopg - PostgreSQL database adapter for Python - Psycopg documentation
Psycopg 3 - PostgreSQL database adapter for Python #
Psycopg 3 is a newly designed PostgreSQL database adapter for the Python programming language.
Psycopg 3 presents a familiar interface for everyone who has used Psycopg 2 or any other DB-API 2.0 database adapter, but allows to use more modern PostgreSQL and Python features, such as:
Documentation #
Getting started with Psycopg 3 #
This section of the documentation will explain how to install Psycopg and how to perform normal activities such as querying the database or loading data using COPY .
Important
If you are familiar with psycopg2 please take a look at Differences from psycopg2 to see what is changed.
Installation #
In short, if you use a supported system :
pip install --upgrade pip # upgrade pip to at least 20.3
pip install "psycopg[binary]"
and you should be ready to start . Read further for alternative ways to install.
Supported systems #
The Psycopg version documented here has official and tested support for:
-
Python: from version 3.7 to 3.11
-
Python 3.6 supported before Psycopg 3.1
-
-
PostgreSQL: from version 10 to 15
-
OS: Linux, macOS, Windows
The tests to verify the supported systems run in Github workflows : anything that is not tested there is not officially supported. This includes:
-
Unofficial Python distributions such as Conda;
-
Alternative PostgreSQL implementation;
-
macOS hardware and releases not available on Github workflows.
If you use an unsupported system, things might work (because, for instance, the database may use the same wire protocol as PostgreSQL) but we cannot guarantee the correct working or a smooth ride.
Binary installation #
The quickest way to start developing with Psycopg 3 is to install the binary packages by running:
pip install "psycopg[binary]"
This will install a self-contained package with all the libraries needed.
You will need pip 20.3 at least
: please run
pip
install
--upgrade
pip
to update it beforehand.
The above package should work in most situations. It will not work in some cases though.
If your platform is not supported you should proceed to a local installation or a pure Python installation .
See also
Did Psycopg 3 install ok? Great! You can now move on to the basic module usage to learn how it works.
Keep on reading if the above method didn’t work and you need a different way to install Psycopg 3.
For further information about the differences between the packages see pq module implementations .
Local installation #
A "Local installation" results in a performing and maintainable library. The
library will include the speed-up C module and will be linked to the system
libraries (
libpq
,
libssl
…) so that system upgrade of libraries will
upgrade the libraries used by Psycopg 3 too. This is the preferred way to
install Psycopg for a production site.
In order to perform a local installation you need some prerequisites:
-
a C compiler,
-
Python development headers (e.g. the
python3-dev
package). -
PostgreSQL client development headers (e.g. the
libpq-dev
package). -
The pg_config program available in the
PATH
.
You must be able to troubleshoot an extension build, for instance you must be able to read your compiler’s error message. If you are not, please don’t try this and follow the binary installation instead.
If your build prerequisites are in place you can run:
pip install "psycopg[c]"
Pure Python installation #
If you simply install:
pip install psycopg
without
[c]
or
[binary]
extras you will obtain a pure Python
implementation. This is particularly handy to debug and hack, but it still
requires the system libpq to operate (which will be imported dynamically via
ctypes
).
In order to use the pure Python installation you will need the
libpq
installed in the system: for instance on Debian system you will probably
need:
sudo apt install libpq5
Note
The
libpq
is the client library used by
psql
, the
PostgreSQL command line client, to connect to the database. On most
systems, installing
psql
will install the
libpq
too as a
dependency.
If you are not able to fulfill this requirement please follow the binary installation .
Installing the connection pool #
The
Psycopg connection pools
are distributed in a
separate package from the
psycopg
package itself, in order to allow a
different release cycle.
In order to use the pool you must install the
pool
extra, using
pip
install
"psycopg[pool]"
, or install the
psycopg_pool
package separately,
which would allow to specify the release to install more precisely.
Handling dependencies #
If you need to specify your project dependencies (for instance in a
requirements.txt
file,
setup.py
,
pyproject.toml
dependencies…)
you should probably specify one of the following:
-
If your project is a library, add a dependency on
psycopg
. This will make sure that your library will have thepsycopg
package with the right interface and leaves the possibility of choosing a specific implementation to the end user of your library. -
If your project is a final application (e.g. a service running on a server) you can require a specific implementation, for instance
psycopg[c]
, after you have made sure that the prerequisites are met (e.g. the depending libraries and tools are installed in the host machine).
In both cases you can specify which version of Psycopg to use using requirement specifiers .
If you want to make sure that a specific implementation is used you can
specify the
PSYCOPG_IMPL
environment variable: importing the library
will fail if the implementation specified is not available. See
pq module implementations
.
Basic module usage #
The basic Psycopg usage is common to all the database adapters implementing
the
DB-API
protocol. Other database adapters, such as the builtin
sqlite3
or
psycopg2
, have roughly the same pattern of interaction.
Main objects in Psycopg 3 #
Here is an interactive session showing some of the basic commands:
# Note: the module name is psycopg, not psycopg3
import psycopg
# Connect to an existing database
with psycopg.connect("dbname=test user=postgres") as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Execute a command: this creates a new table
cur.execute("""
CREATE TABLE test (
id serial PRIMARY KEY,
num integer,
data text)
""")
# Pass data to fill a query placeholders and let Psycopg perform
# the correct conversion (no SQL injections!)
cur.execute(
"INSERT INTO test (num, data) VALUES (%s, %s)",
(100, "abc'def"))
# Query the database and obtain data as Python objects.
cur.execute("SELECT * FROM test")
cur.fetchone()
# will return (1, 100, "abc'def")
# You can use `cur.fetchmany()`, `cur.fetchall()` to return a list
# of several records, or even iterate on the cursor
for record in cur:
print(record)
# Make the changes to the database persistent
conn.commit()
In the example you can see some of the main objects and methods and how they relate to each other:
-
The function
connect()
creates a new database session and returns a newConnection
instance.AsyncConnection.connect()
creates anasyncio
connection instead. -
The
Connection
class encapsulates a database session. It allows to:-
create new
Cursor
instances using thecursor()
method to execute database commands and queries, -
terminate transactions using the methods
commit()
orrollback()
.
-
-
The class
Cursor
allows interaction with the database:-
send commands to the database using methods such as
execute()
andexecutemany()
, -
retrieve data from the database, iterating on the cursor or using methods such as
fetchone()
,fetchmany()
,fetchall()
.
-
-
Using these objects as context managers (i.e. using
with
) will make sure to close them and free their resources at the end of the block (notice that this is different from psycopg2 ).
See also
A few important topics you will have to deal with are:
Shortcuts #
The pattern above is familiar to
psycopg2
users. However, Psycopg 3 also
exposes a few simple extensions which make the above pattern leaner:
-
the
Connection
objects exposes anexecute()
method, equivalent to creating a cursor, calling itsexecute()
method, and returning it.# In Psycopg 2 cur = conn.cursor() cur.execute(...) # In Psycopg 3 cur = conn.execute(...)
-
The
Cursor.execute()
method returnsself
. This means that you can chain a fetch operation, such asfetchone()
, to theexecute()
call:# In Psycopg 2 cur.execute(...) record = cur.fetchone() cur.execute(...) for record in cur: ... # In Psycopg 3 record = cur.execute(...).fetchone() for record in cur.execute(...): ...
Using them together, in simple cases, you can go from creating a connection to using a result in a single expression:
print(psycopg.connect(DSN).execute("SELECT now()").fetchone()[0])
# 2042-07-12 18:15:10.706497+01:00
Connection context #
Psycopg 3
Connection
can be used as a context manager:
with psycopg.connect() as conn:
... # use the connection
# the connection is now closed
When the block is exited, if there is a transaction open, it will be committed. If an exception is raised within the block the transaction is rolled back. In both cases the connection is closed. It is roughly the equivalent of:
conn = psycopg.connect()
try:
... # use the connection
except BaseException:
conn.rollback()
else:
conn.commit()
finally:
conn.close()
Note
This behaviour is not what
psycopg2
does: in
psycopg2
there is
no final close()
and the connection can be used in several
with
statements to manage different transactions. This behaviour has
been considered non-standard and surprising so it has been replaced by the
more explicit
transaction()
block.
Note that, while the above pattern is what most people would use,
connect()
doesn’t enter a block itself, but returns an "un-entered" connection, so that
it is still possible to use a connection regardless of the code scope and the
developer is free to use (and responsible for calling)
commit()
,
rollback()
,
close()
as and where needed.
Warning
If a connection is just left to go out of scope, the way it will behave
with or without the use of a
with
block is different:
-
if the connection is used without a
with
block, the server will find a connection closed INTRANS and roll back the current transaction; -
if the connection is used with a
with
block, there will be an explicit COMMIT and the operations will be finalised.
You should use a
with
block when your intention is just to execute a
set of operations and then committing the result, which is the most usual
thing to do with a connection. If your connection life cycle and
transaction pattern is different, and want more control on it, the use
without
with
might be more convenient.
See Transactions management for more information.
AsyncConnection
can be also used as context manager, using
async
with
,
but be careful about its quirkiness: see
with async connections
for details.
Adapting pyscopg to your program #
The above pattern of use only shows the default behaviour of the adapter. Psycopg can be customised in several ways, to allow the smoothest integration between your Python program and your PostgreSQL database:
-
If your program is concurrent and based on
asyncio
instead of on threads/processes, you can use async connections and cursors . -
If you want to customise the objects that the cursor returns, instead of receiving tuples, you can specify your row factories .
-
If you want to customise how Python values and PostgreSQL types are mapped into each other, beside the basic type mapping , you can configure your types .
Passing parameters to SQL queries #
Most of the times, writing a program you will have to mix bits of SQL statements with values provided by the rest of the program:
SELECT some, fields FROM some_table WHERE id = ...
id
equals what? Probably you will have a Python value you are looking
for.
execute()
arguments
#
Passing parameters to a SQL statement happens in functions such as
Cursor.execute()
by using
%s
placeholders in the SQL statement, and
passing a sequence of values as the second argument of the function. For
example the Python function call:
cur.execute("""
INSERT INTO some_table (id, created_at, last_name)
VALUES (%s, %s, %s);
""",
(10, datetime.date(2020, 11, 18), "O'Reilly"))
is roughly equivalent to the SQL command:
INSERT INTO some_table (id, created_at, last_name)
VALUES (10, '2020-11-18', 'O''Reilly');
Note that the parameters will not be really merged to the query: query and the parameters are sent to the server separately: see Server-side binding for details.
Named arguments are supported too using
%(
name
)s
placeholders in the
query and specifying the values into a mapping. Using named arguments allows
to specify the values in any order and to repeat the same value in several
places in the query:
cur.execute("""
INSERT INTO some_table (id, created_at, updated_at, last_name)
VALUES (%(id)s, %(created)s, %(created)s, %(name)s);
""",
{'id': 10, 'name': "O'Reilly", 'created': datetime.date(2020, 11, 18)})
Using characters
%
,
(
,
)
in the argument names is not supported.
When parameters are used, in order to include a literal
%
in the query you
can use the
%%
string:
cur.execute("SELECT (%s % 2) = 0 AS even", (10,)) # WRONG
cur.execute("SELECT (%s %% 2) = 0 AS even", (10,)) # correct
While the mechanism resembles regular Python strings manipulation, there are a few subtle differences you should care about when passing parameters to a query.
-
The Python string operator
%
must not be used : theexecute()
method accepts a tuple or dictionary of values as second parameter. Never use%
or+
to merge values into queries :cur.execute("INSERT INTO numbers VALUES (%s, %s)" % (10, 20)) # WRONG cur.execute("INSERT INTO numbers VALUES (%s, %s)", (10, 20)) # correct
-
For positional variables binding, the second argument must always be a sequence , even if it contains a single variable (remember that Python requires a comma to create a single element tuple):
cur.execute("INSERT INTO foo VALUES (%s)", "bar") # WRONG cur.execute("INSERT INTO foo VALUES (%s)", ("bar")) # WRONG cur.execute("INSERT INTO foo VALUES (%s)", ("bar",)) # correct cur.execute("INSERT INTO foo VALUES (%s)", ["bar"]) # correct
-
The placeholder must not be quoted :
cur.execute("INSERT INTO numbers VALUES ('%s')", ("Hello",)) # WRONG cur.execute("INSERT INTO numbers VALUES (%s)", ("Hello",)) # correct
-
The variables placeholder must always be a
%s
, even if a different placeholder (such as a%d
for integers or%f
for floats) may look more appropriate for the type. You may find other placeholders used in Psycopg queries (%b
and%t
) but they are not related to the type of the argument: see Binary parameters and results if you want to read more:cur.execute("INSERT INTO numbers VALUES (%d)", (10,)) # WRONG cur.execute("INSERT INTO numbers VALUES (%s)", (10,)) # correct
-
Only query values should be bound via this method: it shouldn’t be used to merge table or field names to the query. If you need to generate SQL queries dynamically (for instance choosing a table name at runtime) you can use the functionalities provided in the
psycopg.sql
module:cur.execute("INSERT INTO %s VALUES (%s)", ('numbers', 10)) # WRONG cur.execute( # correct SQL("INSERT INTO {} VALUES (%s)").format(Identifier('numbers')), (10,))
Danger: SQL injection #
The SQL representation of many data types is often different from their Python string representation. The typical example is with single quotes in strings: in SQL single quotes are used as string literal delimiters, so the ones appearing inside the string itself must be escaped, whereas in Python single quotes can be left unescaped if the string is delimited by double quotes.
Because of the difference, sometimes subtle, between the data types representations, a naïve approach to query strings composition, such as using Python strings concatenation, is a recipe for terrible problems:
SQL = "INSERT INTO authors (name) VALUES ('%s')" # NEVER DO THIS
data = ("O'Reilly", )
cur.execute(SQL % data) # THIS WILL FAIL MISERABLY
# SyntaxError: syntax error at or near "Reilly"
If the variables containing the data to send to the database come from an untrusted source (such as data coming from a form on a web site) an attacker could easily craft a malformed string, either gaining access to unauthorized data or performing destructive operations on the database. This form of attack is called SQL injection and is known to be one of the most widespread forms of attack on database systems. Before continuing, please print this page as a memo and hang it onto your desk.
Psycopg can automatically convert Python objects to SQL values : using this feature your code will be more robust and reliable. We must stress this point:
Warning
-
Don’t manually merge values to a query: hackers from a foreign country will break into your computer and steal not only your disks, but also your cds, leaving you only with the three most embarrassing records you ever bought. On cassette tapes.
-
If you use the
%
operator to merge values to a query, con artists will seduce your cat, who will run away taking your credit card and your sunglasses with them. -
If you use
+
to merge a textual value to a string, bad guys in balaclava will find their way to your fridge, drink all your beer, and leave your toilet seat up and your toilet paper in the wrong orientation. -
You don’t want to manually merge values to a query: use the provided methods instead.
The correct way to pass variables in a SQL command is using the second
argument of the
Cursor.execute()
method:
SQL = "INSERT INTO authors (name) VALUES (%s)" # Note: no quotes
data = ("O'Reilly", )
cur.execute(SQL, data) # Note: no % operator
Note
Python static code checkers are not quite there yet, but, in the future, it will be possible to check your code for improper use of string expressions in queries. See Checking literal strings in queries for details.
See also
Now that you know how to pass parameters to queries, you can take a look at how Psycopg converts data types .
Binary parameters and results #
PostgreSQL has two different ways to transmit data between client and server:
TEXT
, always available, and
BINARY
,
available most of the times but not always. Usually the binary format is more
efficient to use.
Psycopg can support both formats for each data type. Whenever a value
is passed to a query using the normal
%s
placeholder, the best format
available is chosen (often, but not always, the binary format is picked as the
best choice).
If you have a reason to select explicitly the binary format or the text format
for a value you can use respectively a
%b
placeholder or a
%t
placeholder instead of the normal
%s
.
execute()
will fail if a
Dumper
for the right data type and format is not available.
The same two formats, text or binary, are used by PostgreSQL to return data
from a query to the client. Unlike with parameters, where you can choose the
format value-by-value, all the columns returned by a query will have the same
format. Every type returned by the query should have a
Loader
configured, otherwise the data will be returned as unparsed
str
(for text
results) or buffer (for binary results).
Note
The
pg_type
table defines which format is supported for each PostgreSQL
data type. Text input/output is managed by the functions declared in the
typinput
and
typoutput
fields (always present), binary
input/output is managed by the
typsend
and
typreceive
(which are
optional).
Because not every PostgreSQL type supports binary output, by default, the data
will be returned in text format. In order to return data in binary format you
can create the cursor using
Connection.cursor
(binary=True)
or execute
the query using
Cursor.execute
(binary=True)
. A case in which
requesting binary results is a clear winner is when you have large binary data
in the database, such as images:
cur.execute(
"SELECT image_data FROM images WHERE id = %s", [image_id], binary=True)
data = cur.fetchone()[0]
Adapting basic Python types #
Many standard Python types are adapted into SQL and returned as Python objects when a query is executed.
Converting the following data types between Python and PostgreSQL works out-of-the-box and doesn’t require any configuration. In case you need to customise the conversion you should take a look at Data adaptation configuration .
Booleans adaptation #
Python
bool
values
True
and
False
are converted to the equivalent
PostgreSQL boolean type
:
>>> cur.execute("SELECT %s, %s", (True, False))
# equivalent to "SELECT true, false"
Numbers adaptation #
See also
-
Python
int
values can be converted to PostgreSQLsmallint
,integer
,bigint
, ornumeric
, according to their numeric value. Psycopg will choose the smallest data type available, because PostgreSQL can automatically cast a type up (e.g. passing asmallint
where PostgreSQL expect aninteger
is gladly accepted) but will not cast down automatically (e.g. if a function has aninteger
argument, passing it abigint
value will fail, even if the value is 1). -
Python
float
values are converted to PostgreSQLfloat8
. -
Python
Decimal
values are converted to PostgreSQLnumeric
.
On the way back, smaller types (
int2
,
int4
,
float4
) are
promoted to the larger Python counterpart.
Note
Sometimes you may prefer to receive
numeric
data as
float
instead, for performance reason or ease of manipulation: you can configure
an adapter to
cast PostgreSQL numeric to Python float
. This of course may imply a loss of precision.
Strings adaptation #
See also
Python
str
are converted to PostgreSQL string syntax, and PostgreSQL types
such as
text
and
varchar
are converted back to Python
str
:
conn = psycopg.connect()
conn.execute(
"INSERT INTO menu (id, entry) VALUES (%s, %s)",
(1, "Crème Brûlée at 4.99€"))
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
PostgreSQL databases
have an encoding
, and
the session has an encoding
too, exposed in the
Connection.info.
encoding
attribute. If your database and connection are in UTF-8 encoding you will
likely have no problem, otherwise you will have to make sure that your
application only deals with the non-ASCII chars that the database can handle;
failing to do so may result in encoding/decoding errors:
# The encoding is set at connection time according to the db configuration
conn.info.encoding
'utf-8'
# The Latin-9 encoding can manage some European accented letters
# and the Euro symbol
conn.execute("SET client_encoding TO LATIN9")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
# The Latin-1 encoding doesn't have a representation for the Euro symbol
conn.execute("SET client_encoding TO LATIN1")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
# Traceback (most recent call last)
# ...
# UntranslatableCharacter: character with byte sequence 0xe2 0x82 0xac
# in encoding "UTF8" has no equivalent in encoding "LATIN1"
In rare cases you may have strings with unexpected encodings in the database.
Using the
SQL_ASCII
client encoding will disable decoding of the data
coming from the database, which will be returned as
bytes
:
conn.execute("SET client_encoding TO SQL_ASCII")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
b'Cr\xc3\xa8me Br\xc3\xbbl\xc3\xa9e at 4.99\xe2\x82\xac'
Alternatively you can cast the unknown encoding data to
bytea
to
retrieve it as bytes, leaving other strings unaltered: see
Binary adaptation
Note that PostgreSQL text cannot contain the
0x00
byte. If you need to
store Python strings that may contain binary zeros you should use a
bytea
field.
Binary adaptation #
Python types representing binary objects (
bytes
,
bytearray
,
memoryview
)
are converted by default to
bytea
fields. By default data received is
returned as
bytes
.
If you are storing large binary data in bytea fields (such as binary documents or images) you should probably use the binary format to pass and return values, otherwise binary data will undergo ASCII escaping , taking some CPU time and more bandwidth. See Binary parameters and results for details.
Date/time types adaptation #
See also
-
Python
date
objects are converted to PostgreSQLdate
. -
Python
datetime
objects are converted to PostgreSQLtimestamp
(if they don’t have atzinfo
set) ortimestamptz
(if they do). -
Python
time
objects are converted to PostgreSQLtime
(if they don’t have atzinfo
set) ortimetz
(if they do). -
Python
timedelta
objects are converted to PostgreSQLinterval
.
PostgreSQL
timestamptz
values are returned with a timezone set to the
connection TimeZone setting
, which is available as a Python
ZoneInfo
object in the
Connection.info
.
timezone
attribute:
>>> conn.info.timezone
zoneinfo.ZoneInfo(key='Europe/London')
>>> conn.execute("select '2048-07-08 12:00'::timestamptz").fetchone()[0]
datetime.datetime(2048, 7, 8, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='Europe/London'))
Note
PostgreSQL
timestamptz
doesn’t store "a timestamp with a timezone
attached": it stores a timestamp always in UTC, which is converted, on
output, to the connection TimeZone setting:
>>> conn.execute("SET TIMEZONE to 'Europe/Rome'") # UTC+2 in summer
>>> conn.execute("SELECT '2042-07-01 12:00Z'::timestamptz").fetchone()[0] # UTC input
datetime.datetime(2042, 7, 1, 14, 0, tzinfo=zoneinfo.ZoneInfo(key='Europe/Rome'))
Check out the PostgreSQL documentation about timezones for all the details.
JSON adaptation #
Psycopg can map between Python objects and PostgreSQL json/jsonb types , allowing to customise the load and dump function used.
Because several Python objects could be considered JSON (dicts, lists,
scalars, even date/time if using a dumps function customised to use them),
Psycopg requires you to wrap the object to dump as JSON into a wrapper:
either
psycopg.types.json.Json
or
Jsonb
.
from psycopg.types.json import Jsonb
thing = {"foo": ["bar", 42]}
conn.execute("INSERT INTO mytable VALUES (%s)", [Jsonb(thing)])
By default Psycopg uses the standard library
json.dumps
and
json.loads
functions to serialize and de-serialize Python objects to JSON. If you want to
customise how serialization happens, for instance changing serialization
parameters or using a different JSON library, you can specify your own
functions using the
psycopg.types.json.set_json_dumps()
and
set_json_loads()
functions, to apply either globally or
to a specific context (connection or cursor).
from functools import partial
from psycopg.types.json import Jsonb, set_json_dumps, set_json_loads
import ujson
# Use a faster dump function
set_json_dumps(ujson.dumps)
# Return floating point values as Decimal, just in one connection
set_json_loads(partial(json.loads, parse_float=Decimal), conn)
conn.execute("SELECT %s", [Jsonb({"value": 123.45})]).fetchone()[0]
# {'value': Decimal('123.45')}
If you need an even more specific dump customisation only for certain objects
(including different configurations in the same query) you can specify a
dumps
parameter in the
Json
/
Jsonb
wrapper, which will
take precedence over what is specified by
set_json_dumps()
.
from uuid import UUID, uuid4
class UUIDEncoder(json.JSONEncoder):
"""A JSON encoder which can dump UUID."""
def default(self, obj):
if isinstance(obj, UUID):
return str(obj)
return json.JSONEncoder.default(self, obj)
uuid_dumps = partial(json.dumps, cls=UUIDEncoder)
obj = {"uuid": uuid4()}
cnn.execute("INSERT INTO objs VALUES %s", [Json(obj, dumps=uuid_dumps)])
# will insert: {'uuid': '0a40799d-3980-4c65-8315-2956b18ab0e1'}
Lists adaptation #
Python
list
objects are adapted to
PostgreSQL arrays
and back. Only
lists containing objects of the same type can be dumped to PostgreSQL (but the
list may contain
None
elements).
Note
If you have a list of values which you want to use with the
IN
operator… don’t. It won’t work (neither with a list nor with a tuple):
>>> conn.execute("SELECT * FROM mytable WHERE id IN %s", [[10,20,30]])
Traceback (most recent call last):
File "" , line 1, in
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SELECT * FROM mytable WHERE id IN $1
^
What you want to do instead is to use the ‘= ANY()’ expression and pass the values as a list (not a tuple).
>>> conn.execute("SELECT * FROM mytable WHERE id = ANY(%s)", [[10,20,30]])
This has also the advantage of working with an empty list, whereas
IN
()
is not valid SQL.
UUID adaptation #
Python
uuid.UUID
objects are adapted to PostgreSQL
UUID type
and back:
>>> conn.execute("select gen_random_uuid()").fetchone()[0]
UUID('97f0dd62-3bd2-459e-89b8-a5e36ea3c16c')
>>> from uuid import uuid4
>>> conn.execute("select gen_random_uuid() = %s", [uuid4()]).fetchone()[0]
False # long shot
Network data types adaptation #
Objects from the
ipaddress
module are converted to PostgreSQL
network
address types
:
-
IPv4Address
,IPv4Interface
objects are converted to the PostgreSQLinet
type. On the way back,inet
values indicating a single address are converted toIPv4Address
, otherwise they are converted toIPv4Interface
-
IPv4Network
objects are converted to thecidr
type and back. -
IPv6Address
,IPv6Interface
,IPv6Network
objects follow the same rules, with IPv6inet
andcidr
values.
>>> conn.execute("select '192.168.0.1'::inet, '192.168.0.1/24'::inet").fetchone()
(IPv4Address('192.168.0.1'), IPv4Interface('192.168.0.1/24'))
>>> conn.execute("select '::ffff:1.2.3.0/120'::cidr").fetchone()[0]
IPv6Network('::ffff:102:300/120')
Enum adaptation #
New in version 3.1.
Psycopg can adapt Python
Enum
subclasses into PostgreSQL enum types
(created with the
CREATE
TYPE
...
AS
ENUM
(...)
command).
In order to set up a bidirectional enum mapping, you should get information
about the PostgreSQL enum using the
EnumInfo
class and
register it using
register_enum()
. The behaviour of unregistered
and registered enums is different.
-
If the enum is not registered with
register_enum()
:-
Pure
Enum
classes are dumped as normal strings, using their member names as value. The unknown oid is used, so PostgreSQL should be able to use this string in most contexts (such as an enum or a text field).Changed in version 3.1: In previous version dumping pure enums is not supported and raise a "cannot adapt" error.
-
Mix-in enums are dumped according to their mix-in type (because a
class MyIntEnum(int, Enum)
is more specifically anint
than anEnum
, so it’s dumped by default according toint
rules). -
PostgreSQL enums are loaded as Python strings. If you want to load arrays of such enums you will have to find their OIDs using
types.TypeInfo.fetch()
and register them usingregister()
.
-
-
If the enum is registered (using
EnumInfo
.fetch()
andregister_enum()
):-
Enums classes, both pure and mixed-in, are dumped by name.
-
The registered PostgreSQL enum is loaded back as the registered Python enum members.
-
- class psycopg.types.enum. EnumInfo ( name , oid , array_oid , labels ) #
-
Manage information about an enum type.
EnumInfo
is a subclass ofTypeInfo
: refer to the latter’s documentation for generic usage, especially thefetch()
method.- enum #
-
After
register_enum()
is called, it will contain the Python type mapping to the registered enum.
- psycopg.types.enum. register_enum ( info , context = None , enum = None , * , mapping = None ) #
-
Register the adapters to load and dump a enum type.
- Parameters :
-
-
info (
EnumInfo
) - The object with the information about the enum to register. -
context (
Optional
[AdaptContext
]) - The context where to register the adapters. IfNone
, register it globally. -
enum (
Optional
[Type
[TypeVar
(E
, bound=Enum
)]]) - Python enum type matching to the PostgreSQL one. IfNone
, a new enum will be generated and exposed asEnumInfo.enum
. -
mapping (
Union
[Mapping
[TypeVar
(E
, bound=Enum
),str
],Sequence
[Tuple
[TypeVar
(E
, bound=Enum
),str
]],None
]) - Override the mapping betweenenum
members andinfo
labels.
-
After registering, fetching data of the registered enum will cast PostgreSQL enum labels into corresponding Python enum members.
If no
enum
is specified, a newEnum
is created based on PostgreSQL enum labels.
Example:
>>> from enum import Enum, auto
>>> from psycopg.types.enum import EnumInfo, register_enum
>>> class UserRole(Enum):
... ADMIN = auto()
... EDITOR = auto()
... GUEST = auto()
>>> conn.execute("CREATE TYPE user_role AS ENUM ('ADMIN', 'EDITOR', 'GUEST')")
>>> info = EnumInfo.fetch(conn, "user_role")
>>> register_enum(info, conn, UserRole)
>>> some_editor = info.enum.EDITOR
>>> some_editor
>>> conn.execute(
... "SELECT pg_typeof(%(editor)s), %(editor)s",
... {"editor": some_editor}
... ).fetchone()
('user_role', )
>>> conn.execute(
... "SELECT ARRAY[%s, %s]",
... [UserRole.ADMIN, UserRole.GUEST]
... ).fetchone()
[, ]
If the Python and the PostgreSQL enum don’t match 1:1 (for instance if members
have a different name, or if more than one Python enum should map to the same
PostgreSQL enum, or vice versa), you can specify the exceptions using the
mapping
parameter.
mapping
should be a dictionary with Python enum members as keys and the
matching PostgreSQL enum labels as values, or a list of
(member,
label)
pairs with the same meaning (useful when some members are repeated). Order
matters: if an element on either side is specified more than once, the last
pair in the sequence will take precedence:
# Legacy roles, defined in medieval times.
>>> conn.execute(
... "CREATE TYPE abbey_role AS ENUM ('ABBOT', 'SCRIBE', 'MONK', 'GUEST')")
>>> info = EnumInfo.fetch(conn, "abbey_role")
>>> register_enum(info, conn, UserRole, mapping=[
... (UserRole.ADMIN, "ABBOT"),
... (UserRole.EDITOR, "SCRIBE"),
... (UserRole.EDITOR, "MONK")])
>>> conn.execute("SELECT '{ABBOT,SCRIBE,MONK,GUEST}'::abbey_role[]").fetchone()[0]
[<UserRole.ADMIN: 1>,
<UserRole.EDITOR: 2>,
<UserRole.EDITOR: 2>,
<UserRole.GUEST: 3>]
>>> conn.execute("SELECT %s::text[]", [list(UserRole)]).fetchone()[0]
['ABBOT', 'MONK', 'GUEST']
A particularly useful case is when the PostgreSQL labels match the
values
of
a
str
-based Enum. In this case it is possible to use something like
{m:
m.value
for
m
in
enum}
as mapping:
>>> class LowercaseRole(str, Enum):
... ADMIN = "admin"
... EDITOR = "editor"
... GUEST = "guest"
>>> conn.execute(
... "CREATE TYPE lowercase_role AS ENUM ('admin', 'editor', 'guest')")
>>> info = EnumInfo.fetch(conn, "lowercase_role")
>>> register_enum(
... info, conn, LowercaseRole, mapping={m: m.value for m in LowercaseRole})
>>> conn.execute("SELECT 'editor'::lowercase_role").fetchone()[0]
Adapting other PostgreSQL types #
PostgreSQL offers other data types which don’t map to native Python types. Psycopg offers wrappers and conversion functions to allow their use.
Composite types casting #
Psycopg can adapt PostgreSQL composite types (either created with the
CREATE
TYPE
command or implicitly defined after a table row type) to and from
Python tuples,
namedtuple
, or any other suitable object
configured.
Before using a composite type it is necessary to get information about it
using the
CompositeInfo
class and to register it
using
register_composite()
.
- class psycopg.types.composite. CompositeInfo ( name , oid , array_oid , * , regtype = '' , field_names , field_types ) #
-
Manage information about a composite type.
CompositeInfo
is aTypeInfo
subclass: check its documentation for the generic usage, especially thefetch()
method.- python_type #
-
After
register_composite()
is called, it will contain the python type mapping to the registered composite.
- psycopg.types.composite. register_composite ( info , context = None , factory = None ) #
-
Register the adapters to load and dump a composite type.
- Parameters :
-
-
info (
CompositeInfo
) - The object with the information about the composite to register. -
context (
Optional
[AdaptContext
]) - The context where to register the adapters. IfNone
, register it globally. -
factory (
Optional
[Callable
[...
,Any
]]) - Callable to convert the sequence of attributes read from the composite into a Python object.
-
Note
Registering the adapters doesn’t affect objects already created, even if they are children of the registered context. For instance, registering the adapter globally doesn’t affect already existing connections.
After registering, fetching data of the registered composite will invoke
factory
to create corresponding Python objects.If no factory is specified, a
namedtuple
is created and used to return data.If the
factory
is a type (and not a generic callable), then dumpers for that type are created and registered too, so that passing objects of that type to a query will adapt them to the registered type.
Example:
>>> from psycopg.types.composite import CompositeInfo, register_composite
>>> conn.execute("CREATE TYPE card AS (value int, suit text)")
>>> info = CompositeInfo.fetch(conn, "card")
>>> register_composite(info, conn)
>>> my_card = info.python_type(8, "hearts")
>>> my_card
card(value=8, suit='hearts')
>>> conn.execute(
... "SELECT pg_typeof(%(card)s), (%(card)s).suit", {"card": my_card}
... ).fetchone()
('card', 'hearts')
>>> conn.execute("SELECT (%s, %s)::card", [1, "spades"]).fetchone()[0]
card(value=1, suit='spades')
Nested composite types are handled as expected, provided that the type of the composite components are registered as well:
>>> conn.execute("CREATE TYPE card_back AS (face card, back text)")
>>> info2 = CompositeInfo.fetch(conn, "card_back")
>>> register_composite(info2, conn)
>>> conn.execute("SELECT ((8, 'hearts'), 'blue')::card_back").fetchone()[0]
card_back(face=card(value=8, suit='hearts'), back='blue')
Range adaptation #
PostgreSQL range types are a family of data types representing a range of values between two elements. The type of the element is called the range subtype . PostgreSQL offers a few built-in range types and allows the definition of custom ones.
All the PostgreSQL range types are loaded as the
Range
Python type, which is a
Generic
type and can hold bounds of
different types.
- class psycopg.types.range. Range ( lower = None , upper = None , bounds = '[)' , empty = False ) #
-
Python representation for a PostgreSQL range type.
- Parameters :
-
-
lower (
Optional
[TypeVar
(T
)]) - lower bound for the range.None
means unbound -
upper (
Optional
[TypeVar
(T
)]) - upper bound for the range.None
means unbound -
bounds (
str
) - one of the literal strings()
,[)
,(]
,[]
, representing whether the lower or upper bounds are included -
empty (
bool
) - ifTrue
, the range is empty
-
This Python type is only used to pass and retrieve range values to and from PostgreSQL and doesn’t attempt to replicate the PostgreSQL range features: it doesn’t perform normalization and doesn’t implement all the operators supported by the database.
PostgreSQL will perform normalisation on
Range
objects used as query parameters, so, when they are fetched back, they will be found in the normal form (for instance ranges on integers will have[)
bounds).Range
objects are immutable, hashable, and support thein
operator (checking if an element is within the range). They can be tested for equivalence. Empty ranges evaluate toFalse
in a boolean context, nonempty ones evaluate toTrue
.Range
objects have the following attributes:- isempty #
-
True
if the range is empty.
- lower #
-
The lower bound of the range.
None
if empty or unbound.
- upper #
-
The upper bound of the range.
None
if empty or unbound.
- lower_inc #
-
True
if the lower bound is included in the range.
- upper_inc #
-
True
if the upper bound is included in the range.
- lower_inf #
-
True
if the range doesn’t have a lower bound.
- upper_inf #
-
True
if the range doesn’t have an upper bound.
The built-in range objects are adapted automatically: if a
Range
objects
contains
date
bounds, it is dumped using the
daterange
OID,
and of course
daterange
values are loaded back as
Range[date]
.
If you create your own range type you can use
RangeInfo
and
register_range()
to associate the range type with
its subtype and make it work like the builtin ones.
- class psycopg.types.range. RangeInfo ( name , oid , array_oid , * , regtype = '' , subtype_oid ) #
-
Manage information about a range type.
RangeInfo
is aTypeInfo
subclass: check its documentation for generic details, especially thefetch()
method.
- psycopg.types.range. register_range ( info , context = None ) #
-
Register the adapters to load and dump a range type.
- Parameters :
-
-
info (
RangeInfo
) - The object with the information about the range to register. -
context (
Optional
[AdaptContext
]) - The context where to register the adapters. IfNone
, register it globally.
-
Register loaders so that loading data of this type will result in a
Range
with bounds parsed as the right subtype.Note
Registering the adapters doesn’t affect objects already created, even if they are children of the registered context. For instance, registering the adapter globally doesn’t affect already existing connections.
Example:
>>> from psycopg.types.range import Range, RangeInfo, register_range
>>> conn.execute("CREATE TYPE strrange AS RANGE (SUBTYPE = text)")
>>> info = RangeInfo.fetch(conn, "strrange")
>>> register_range(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [Range("a", "z")]).fetchone()[0]
'strrange'
>>> conn.execute("SELECT '[a,z]'::strrange").fetchone()[0]
Range('a', 'z', '[]')
Multirange adaptation #
Since PostgreSQL 14, every range type is associated with a multirange , a type representing a disjoint set of ranges. A multirange is automatically available for every range, built-in and user-defined.
All the PostgreSQL range types are loaded as the
Multirange
Python type, which is a mutable
sequence of
Range
elements.
- class psycopg.types.multirange. Multirange ( items = () ) #
-
Python representation for a PostgreSQL multirange type.
- Parameters :
-
items (
Iterable
[Range
[TypeVar
(T
)]]) - Sequence of ranges to initialise the object.
This Python type is only used to pass and retrieve multirange values to and from PostgreSQL and doesn’t attempt to replicate the PostgreSQL multirange features: overlapping items are not merged, empty ranges are not discarded, the items are not ordered, the behaviour of multirange operators is not replicated in Python.
PostgreSQL will perform normalisation on
Multirange
objects used as query parameters, so, when they are fetched back, they will be found ordered, with overlapping ranges merged, etc.Multirange
objects are aMutableSequence
and are totally ordered: they behave pretty much like a list ofRange
. Like Range, they areGeneric
on the subtype of their range, so you can declare a variable to beMultirange[date]
and mypy will complain if you try to add it aRange[Decimal]
.
Like for
Range
, built-in multirange objects are adapted
automatically: if a
Multirange
object contains
Range
with
date
bounds, it is dumped using the
datemultirange
OID, and
datemultirange
values are loaded back as
Multirange[date]
.
If you have created your own range type you can use
MultirangeInfo
and
register_multirange()
to associate the resulting
multirange type with its subtype and make it work like the builtin ones.
- class psycopg.types.multirange. MultirangeInfo ( name , oid , array_oid , * , regtype = '' , range_oid , subtype_oid ) #
-
Manage information about a multirange type.
MultirangeInfo
is aTypeInfo
subclass: check its documentation for generic details, especially thefetch()
method.
- psycopg.types.multirange. register_multirange ( info , context = None ) #
-
Register the adapters to load and dump a multirange type.
- Parameters :
-
-
info (
MultirangeInfo
) - The object with the information about the range to register. -
context (
Optional
[AdaptContext
]) - The context where to register the adapters. IfNone
, register it globally.
-
Register loaders so that loading data of this type will result in a
Range
with bounds parsed as the right subtype.Note
Registering the adapters doesn’t affect objects already created, even if they are children of the registered context. For instance, registering the adapter globally doesn’t affect already existing connections.
Example:
>>> from psycopg.types.multirange import \
... Multirange, MultirangeInfo, register_multirange
>>> from psycopg.types.range import Range
>>> conn.execute("CREATE TYPE strrange AS RANGE (SUBTYPE = text)")
>>> info = MultirangeInfo.fetch(conn, "strmultirange")
>>> register_multirange(info, conn)
>>> rec = conn.execute(
... "SELECT pg_typeof(%(mr)s), %(mr)s",
... {"mr": Multirange([Range("a", "q"), Range("l", "z")])}).fetchone()
>>> rec[0]
'strmultirange'
>>> rec[1]
Multirange([Range('a', 'z', '[)')])
Hstore adaptation #
The
hstore
data type is a key-value store embedded in PostgreSQL. It
supports GiST or GIN indexes allowing search by keys or key/value pairs as
well as regular BTree indexes for equality, uniqueness etc.
Psycopg can convert Python
dict
objects to and from
hstore
structures.
Only dictionaries with string keys and values are supported.
None
is also
allowed as value but not as a key.
In order to use the
hstore
data type it is necessary to load it in a
database using:
=# CREATE EXTENSION hstore;
Because
hstore
is distributed as a contrib module, its oid is not well
known, so it is necessary to use
TypeInfo
.
fetch()
to query the database and get its oid. The
resulting object can be passed to
register_hstore()
to configure dumping
dict
to
hstore
and parsing
hstore
back to
dict
, in the context where the
adapter is registered.
- psycopg.types.hstore. register_hstore ( info , context = None ) #
-
Register the adapters to load and dump hstore.
- Parameters :
-
-
info (
TypeInfo
) - The object with the information about the hstore type. -
context (
Optional
[AdaptContext
]) - The context where to register the adapters. IfNone
, register it globally.
-
Note
Registering the adapters doesn’t affect objects already created, even if they are children of the registered context. For instance, registering the adapter globally doesn’t affect already existing connections.
Example:
>>> from psycopg.types import TypeInfo
>>> from psycopg.types.hstore import register_hstore
>>> info = TypeInfo.fetch(conn, "hstore")
>>> register_hstore(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [{"a": "b"}]).fetchone()[0]
'hstore'
>>> conn.execute("SELECT 'foo => bar'::hstore").fetchone()[0]
{'foo': 'bar'}
Geometry adaptation using Shapely #
When using the PostGIS extension, it can be useful to retrieve geometry values and have them automatically converted to Shapely instances. Likewise, you may want to store such instances in the database and have the conversion happen automatically.
Warning
Psycopg doesn’t have a dependency on the
shapely
package: you should
install the library as an additional dependency of your project.
Warning
This module is experimental and might be changed in the future according to users’ feedback.
Since PostgGIS is an extension, the
geometry
type oid is not well
known, so it is necessary to use
TypeInfo
.
fetch()
to query the database and find it. The
resulting object can be passed to
register_shapely()
to configure dumping
shape
instances to
geometry
columns and parsing
geometry
data back to
shape
instances, in the context where the
adapters are registered.
- psycopg.types.shapely. register_shapely ( ) #
-
Register Shapely dumper and loaders.
After invoking this function on an adapter, the queries retrieving PostGIS geometry objects will return Shapely’s shape object instances both in text and binary mode.
Similarly, shape objects can be sent to the database.
This requires the Shapely library to be installed.
- Parameters :
-
-
info - The object with the information about the geometry type.
-
context - The context where to register the adapters. If
None
, register it globally.
-
Note
Registering the adapters doesn’t affect objects already created, even if they are children of the registered context. For instance, registering the adapter globally doesn’t affect already existing connections.
Example:
>>> from psycopg.types import TypeInfo
>>> from psycopg.types.shapely import register_shapely
>>> from shapely.geometry import Point
>>> info = TypeInfo.fetch(conn, "geometry")
>>> register_shapely(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [Point(1.2, 3.4)]).fetchone()[0]
'geometry'
>>> conn.execute("""
... SELECT ST_GeomFromGeoJSON('{
... "type":"Point",
... "coordinates":[-48.23456,20.12345]}')
... """).fetchone()[0]
Notice that, if the geometry adapters are registered on a specific object (a connection or cursor), other connections and cursors will be unaffected:
>>> conn2 = psycopg.connect(CONN_STR)
>>> conn2.execute("""
... SELECT ST_GeomFromGeoJSON('{
... "type":"Point",
... "coordinates":[-48.23456,20.12345]}')
... """).fetchone()[0]
'0101000020E61000009279E40F061E48C0F2B0506B9A1F3440'
Transactions management #
Psycopg has a behaviour that may seem surprising compared to
psql
: by default, any database operation will start a new
transaction. As a consequence, changes made by any cursor of the connection
will not be visible until
Connection.commit()
is called, and will be
discarded by
Connection.rollback()
. The following operation on the same
connection will start a new transaction.
If a database operation fails, the server will refuse further commands, until
a
rollback()
is called.
If the cursor is closed with a transaction open, no COMMIT command is sent to the server, which will then discard the connection. Certain middleware (such as PgBouncer) will also discard a connection left in transaction state, so, if possible you will want to commit or rollback a connection before finishing working with it.
An example of what will happen, the first time you will use Psycopg (and to be disappointed by it), is likely:
conn = psycopg.connect()
# Creating a cursor doesn't start a transaction or affect the connection
# in any way.
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
# If your program spends a long time in this state, the server will keep
# a connection "idle in transaction", which is likely something undesired
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# This statement is executed inside the transaction
conn.close()
# No COMMIT was sent: the INSERT was discarded.
There are a few things going wrong here, let’s see how they can be improved.
One obvious problem after the run above is that, firing up
psql
,
you will see no new record in the table
data
. One way to fix the problem
is to call
conn.commit()
before closing the connection. Thankfully, if you
use the
connection context
, Psycopg will commit the
connection at the end of the block (or roll it back if the block is exited
with an exception):
The code modified using a connection context will result in the following sequence of database statements:
with psycopg.connect() as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# This statement is executed inside the transaction
# No exception at the end of the block:
# COMMIT is executed.
This way we don’t have to remember to call neither
close()
nor
commit()
and the database operations actually have a persistent effect. The code might
still do something you don’t expect: keep a transaction from the first
operation to the connection closure. You can have a finer control over the
transactions using an
autocommit transaction
and/or
transaction contexts
.
Warning
By default even a simple
SELECT
will start a transaction: in
long-running programs, if no further action is taken, the session will
remain
idle in transaction
, an undesirable condition for several
reasons (locks are held by the session, tables bloat…). For long lived
scripts, either make sure to terminate a transaction as soon as possible or
use an
autocommit
connection.
Hint
If a database operation fails with an error message such as
InFailedSqlTransaction: current transaction is aborted, commands ignored
until end of transaction block
, it means that
a previous operation
failed
and the database session is in a state of error. You need to call
rollback()
if you want to keep on using the same connection.
Autocommit transactions #
The manual commit requirement can be suspended using
autocommit
,
either as connection attribute or as
connect()
parameter. This may be required to run operations that cannot be executed
inside a transaction, such as
CREATE
DATABASE
,
VACUUM
,
CALL
on
stored procedures
using transaction control.
With an autocommit transaction, the above sequence of operation results in:
with psycopg.connect(autocommit=True) as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call now only executes:
# - SELECT count(*) FROM my_table
# and no transaction starts.
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# The result of this statement is persisted immediately by the database
# The connection is closed at the end of the block but, because it is not
# in a transaction state, no COMMIT is executed.
An autocommit transaction behaves more as someone coming from psql would expect. This has a beneficial performance effect, because less queries are sent and less operations are performed by the database. The statements, however, are not executed in an atomic transaction; if you need to execute certain operations inside a transaction, you can achieve that with an autocommit connection too, using an explicit transaction block .
Transaction contexts #
A more transparent way to make sure that transactions are finalised at the
right time is to use
with
Connection.transaction()
to create a
transaction context. When the context is entered, a transaction is started;
when leaving the context the transaction is committed, or it is rolled back if
an exception is raised inside the block.
Continuing the example above, if you want to use an autocommit connection but
still wrap selected groups of commands inside an atomic transaction, you can
use a
transaction()
context:
with psycopg.connect(autocommit=True) as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# The connection is autocommit, so no BEGIN executed.
with conn.transaction():
# BEGIN is executed, a transaction started
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
cur.execute("INSERT INTO times VALUES (now())")
# These two operation run atomically in the same transaction
# COMMIT is executed at the end of the block.
# The connection is in idle state again.
# The connection is closed at the end of the block.
Note that connection blocks can also be used with non-autocommit connections:
in this case you still need to pay attention to eventual transactions started
automatically. If an operation starts an implicit transaction, a
transaction()
block will only manage
a savepoint sub-transaction
, leaving the caller to deal with the main transaction,
as explained in
Transactions management
:
conn = psycopg.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
with conn.transaction():
# The block starts with a transaction already open, so it will execute
# - SAVEPOINT
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# The block was executing a sub-transaction so on exit it will only run:
# - RELEASE SAVEPOINT
# The transaction is still on.
conn.close()
# No COMMIT was sent: the INSERT was discarded.
If a
transaction()
block starts when no transaction is active then it will
manage a proper transaction. In essence, a transaction context tries to leave
a connection in the state it found it, and leaves you to deal with the wider
context.
Hint
The interaction between non-autocommit transactions and transaction contexts is probably surprising. Although the non-autocommit default is what’s demanded by the DBAPI, the personal preference of several experienced developers is to:
-
use a connection block:
with psycopg.connect(...) as conn
; -
use an autocommit connection, either passing
autocommit=True
asconnect()
parameter or setting the attributeconn.autocommit = True
; -
use
with conn.transaction()
blocks to manage transactions only where needed.
Nested transactions #
Transaction blocks can be also nested (internal transaction blocks are implemented using SAVEPOINT ): an exception raised inside an inner block has a chance of being handled and not completely fail outer operations. The following is an example where a series of operations interact with the database: operations are allowed to fail; at the end we also want to store the number of operations successfully processed.
with conn.transaction() as tx1:
num_ok = 0
for operation in operations:
try:
with conn.transaction() as tx2:
unreliable_operation(conn, operation)
except Exception:
logger.exception(f"{operation} failed")
else:
num_ok += 1
save_number_of_successes(conn, num_ok)
If
unreliable_operation()
causes an error, including an operation causing a
database error, all its changes will be reverted. The exception bubbles up
outside the block: in the example it is intercepted by the
try
so that the
loop can complete. The outermost block is unaffected (unless other errors
happen there).
You can also write code to explicitly roll back any currently active
transaction block, by raising the
Rollback
exception. The exception "jumps"
to the end of a transaction block, rolling back its transaction but allowing
the program execution to continue from there. By default the exception rolls
back the innermost transaction block, but any current block can be specified
as the target. In the following example, a hypothetical
CancelCommand
may stop the processing and cancel any operation previously performed,
but not entirely committed yet.
from psycopg import Rollback
with conn.transaction() as outer_tx:
for command in commands():
with conn.transaction() as inner_tx:
if isinstance(command, CancelCommand):
raise Rollback(outer_tx)
process_command(command)
# If `Rollback` is raised, it would propagate only up to this block,
# and the program would continue from here with no exception.
Transaction characteristics #
You can set
transaction parameters
for the transactions that Psycopg
handles. They affect the transactions started implicitly by non-autocommit
transactions and the ones started explicitly by
Connection.transaction()
for
both autocommit and non-autocommit transactions. Leaving these parameters as
None
will use the server’s default behaviour (which is controlled
by server settings such as
default_transaction_isolation
).
In order to set these parameters you can use the connection attributes
isolation_level
,
read_only
,
deferrable
. For async connections you must use the equivalent
set_isolation_level()
method and similar. The parameters
can only be changed if there isn’t a transaction already active on the
connection.
Warning
Applications running at
REPEATABLE_READ
or
SERIALIZABLE
isolation level are exposed to serialization
failures.
In certain concurrent update cases
, PostgreSQL will raise an
exception looking like:
psycopg2.errors.SerializationFailure: could not serialize access
due to concurrent update
In this case the application must be prepared to repeat the operation that caused the exception.
Two-Phase Commit protocol support #
New in version 3.1.
Psycopg exposes the two-phase commit features available in PostgreSQL implementing the two-phase commit extensions proposed by the DBAPI.
The DBAPI model of two-phase commit is inspired by the XA specification , according to which transaction IDs are formed from three components:
-
a format ID (non-negative 32 bit integer)
-
a global transaction ID (string not longer than 64 bytes)
-
a branch qualifier (string not longer than 64 bytes)
For a particular global transaction, the first two components will be the same for all the resources. Every resource will be assigned a different branch qualifier.
According to the DBAPI specification, a transaction ID is created using the
Connection.xid()
method. Once you have a transaction id, a distributed
transaction can be started with
Connection.tpc_begin()
, prepared using
tpc_prepare()
and completed using
tpc_commit()
or
tpc_rollback()
. Transaction IDs can also be retrieved from the
database using
tpc_recover()
and completed using the above
tpc_commit()
and
tpc_rollback()
.
PostgreSQL doesn’t follow the XA standard though, and the ID for a PostgreSQL
prepared transaction can be any string up to 200 characters long. Psycopg’s
Xid
objects can represent both XA-style transactions IDs (such as the ones
created by the
xid()
method) and PostgreSQL transaction IDs identified by
an unparsed string.
The format in which the Xids are converted into strings passed to the database is the same employed by the PostgreSQL JDBC driver : this should allow interoperation between tools written in Python and in Java. For example a recovery tool written in Python would be able to recognize the components of transactions produced by a Java program.
For further details see the documentation for the Two-Phase Commit support methods .
Using COPY TO and COPY FROM #
Psycopg allows to operate with
PostgreSQL COPY protocol
.
COPY
is
one of the most efficient ways to load data into the database (and to modify
it, with some SQL creativity).
Copy is supported using the
Cursor.copy()
method, passing it a query of the
form
COPY
...
FROM
STDIN
or
COPY
...
TO
STDOUT
, and managing the
resulting
Copy
object in a
with
block:
with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
# pass data to the 'copy' object using write()/write_row()
You can compose a COPY statement dynamically by using objects from the
psycopg.sql
module:
with cursor.copy(
sql.SQL("COPY {} TO STDOUT").format(sql.Identifier("table_name"))
) as copy:
# read data from the 'copy' object using read()/read_row()
Changed in version 3.1:
You can also pass parameters to
copy()
, like in
execute()
:
with cur.copy("COPY (SELECT * FROM table_name LIMIT %s) TO STDOUT", (3,)) as copy:
# expect no more than three records
The connection is subject to the usual transaction behaviour, so, unless the connection is in autocommit, at the end of the COPY operation you will still have to commit the pending changes and you can still roll them back. See Transactions management for details.
Writing data row-by-row #
Using a copy operation you can load data into the database from any Python
iterable (a list of tuples, or any iterable of sequences): the Python values
are adapted as they would be in normal querying. To perform such operation use
a
COPY
...
FROM
STDIN
with
Cursor.copy()
and use
write_row()
on the resulting object in a
with
block. On exiting the block the
operation will be concluded:
records = [(10, 20, "hello"), (40, None, "world")]
with cursor.copy("COPY sample (col1, col2, col3) FROM STDIN") as copy:
for record in records:
copy.write_row(record)
If an exception is raised inside the block, the operation is interrupted and the records inserted so far are discarded.
In order to read or write from
Copy
row-by-row you must not specify
COPY
options such as
FORMAT
CSV
,
DELIMITER
,
NULL
:
please leave these details alone, thank you :)
Reading data row-by-row #
You can also do the opposite, reading rows out of a
COPY
...
TO
STDOUT
operation, by iterating on
rows()
. However this is not something you
may want to do normally: usually the normal query process will be easier to
use.
PostgreSQL, currently, doesn’t give complete type information on
COPY
TO
, so the rows returned will have unparsed data, as strings or bytes,
according to the format.
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
for row in copy.rows():
print(row) # return unparsed data: ('10', '2046-12-24')
You can improve the results by using
set_types()
before reading, but
you have to specify them yourself.
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
copy.set_types(["int4", "date"])
for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
Copying block-by-block #
If data is already formatted in a way suitable for copy (for instance because
it is coming from a file resulting from a previous
COPY
TO
operation) it can
be loaded into the database using
Copy.write()
instead.
with open("data", "r") as f:
with cursor.copy("COPY data FROM STDIN") as copy:
while data := f.read(BLOCK_SIZE):
copy.write(data)
In this case you can use any
COPY
option and format, as long as the
input data is compatible with what the operation in
copy()
expects. Data
can be passed as
str
, if the copy is in
FORMAT
TEXT
, or as
bytes
,
which works with both
FORMAT
TEXT
and
FORMAT
BINARY
.
In order to produce data in
COPY
format you can use a
COPY
...
TO
STDOUT
statement and iterate over the resulting
Copy
object, which will
produce a stream of
bytes
objects:
with open("data.out", "wb") as f:
with cursor.copy("COPY table_name TO STDOUT") as copy:
for data in copy:
f.write(data)
Binary copy #
Binary copy is supported by specifying
FORMAT
BINARY
in the
COPY
statement. In order to import binary data using
write_row()
, all the
types passed to the database must have a binary dumper registered; this is not
necessary if the data is copied
block-by-block
using
write()
.
Warning
PostgreSQL is particularly finicky when loading data in binary mode and
will apply
no cast rules
. This means, for example, that passing the
value 100 to an
integer
column
will fail
, because Psycopg will pass
it as a
smallint
value, and the server will reject it because its size
doesn’t match what expected.
You can work around the problem using the
set_types()
method of
the
Copy
object and specifying carefully the types to load.
See also
See Binary parameters and results for further info about binary querying.
Asynchronous copy support #
Asynchronous operations are supported using the same patterns as above, using
the objects obtained by an
AsyncConnection
. For instance, if
f
is an
object supporting an asynchronous
read()
method returning
COPY
data,
a fully-async copy operation could be:
async with cursor.copy("COPY data FROM STDIN") as copy:
while data := await f.read():
await copy.write(data)
The
AsyncCopy
object documentation describes the signature of the
asynchronous methods and the differences from its sync
Copy
counterpart.
See also
See Asynchronous operations for further info about using async objects.
Example: copying a table across servers #
In order to copy a table, or a portion of a table, across servers, you can use two COPY operations on two different connections, reading from the first and writing to the second.
with psycopg.connect(dsn_src) as conn1, psycopg.connect(dsn_tgt) as conn2:
with conn1.cursor().copy("COPY src TO STDOUT (FORMAT BINARY)") as copy1:
with conn2.cursor().copy("COPY tgt FROM STDIN (FORMAT BINARY)") as copy2:
for data in copy1:
copy2.write(data)
Using
FORMAT
BINARY
usually gives a performance boost, but it only
works if the source and target schema are
perfectly identical
. If the tables
are only
compatible
(for example, if you are copying an
integer
field
into a
bigint
destination field) you should omit the
BINARY
option and
perform a text-based copy. See
Binary copy
for details.
The same pattern can be adapted to use async objects in order to perform an async copy .
Differences from
psycopg2
#
Psycopg 3 uses the common DBAPI structure of many other database adapters and
tries to behave as close as possible to
psycopg2
. There are however a few
differences to be aware of.
Tip
Most of the times, the workarounds suggested here will work with both Psycopg 2 and 3, which could be useful if you are porting a program or writing a program that should work with both Psycopg 2 and 3.
Server-side binding #
Psycopg 3 sends the query and the parameters to the server separately, instead
of merging them on the client side. Server-side binding works for normal
SELECT
and data manipulation statements (
INSERT
,
UPDATE
,
DELETE
), but it doesn’t work with many other statements. For instance,
it doesn’t work with
SET
or with
NOTIFY
:
>>> conn.execute("SET TimeZone TO %s", ["UTC"])
Traceback (most recent call last):
...
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SET TimeZone TO $1
^
>>> conn.execute("NOTIFY %s, %s", ["chan", 42])
Traceback (most recent call last):
...
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: NOTIFY $1, $2
^
and with any data definition statement:
>>> conn.execute("CREATE TABLE foo (id int DEFAULT %s)", [42])
Traceback (most recent call last):
...
psycopg.errors.UndefinedParameter: there is no parameter $1
LINE 1: CREATE TABLE foo (id int DEFAULT $1)
^
Sometimes, PostgreSQL offers an alternative: for instance the
set_config()
function can be used instead of the
SET
statement, the
pg_notify()
function can be used instead of
NOTIFY
:
>>> conn.execute("SELECT set_config('TimeZone', %s, false)", ["UTC"])
>>> conn.execute("SELECT pg_notify(%s, %s)", ["chan", "42"])
If this is not possible, you must merge the query and the parameter on the
client side. You can do so using the
psycopg.sql
objects:
>>> from psycopg import sql
>>> cur.execute(sql.SQL("CREATE TABLE foo (id int DEFAULT {})").format(42))
or creating a
client-side binding cursor
such as
ClientCursor
:
>>> cur = ClientCursor(conn)
>>> cur.execute("CREATE TABLE foo (id int DEFAULT %s)", [42])
If you need
ClientCursor
often, you can set the
Connection.cursor_factory
to have them created by default by
Connection.cursor()
. This way, Psycopg 3
will behave largely the same way of Psycopg 2.
Note that, both server-side and client-side, you can only specify
values
as parameters (i.e.
the strings that go in single quotes
). If you need to
parametrize different parts of a statement (such as a table name), you must
use the
psycopg.sql
module:
>>> from psycopg import sql
# This will quote the user and the password using the right quotes
# e.g.: ALTER USER "foo" SET PASSWORD 'bar'
>>> conn.execute(
... sql.SQL("ALTER USER {} SET PASSWORD {}")
... .format(sql.Identifier(username), password))
Multiple statements in the same query #
As a consequence of using
server-side bindings
,
when parameters are used, it is not possible to execute several statements in
the same
execute()
call, separating them by semicolon:
>>> conn.execute(
... "INSERT INTO foo VALUES (%s); INSERT INTO foo VALUES (%s)",
... (10, 20))
Traceback (most recent call last):
...
psycopg.errors.SyntaxError: cannot insert multiple commands into a prepared statement
One obvious way to work around the problem is to use several
execute()
calls.
There is no such limitation if no parameters are used
. As a consequence, you
can compose a multiple query on the client side and run them all in the same
execute()
call, using the
psycopg.sql
objects:
>>> from psycopg import sql
>>> conn.execute(
... sql.SQL("INSERT INTO foo VALUES ({}); INSERT INTO foo values ({})"
... .format(10, 20))
or a client-side binding cursor :
>>> cur = psycopg.ClientCursor(conn)
>>> cur.execute(
... "INSERT INTO foo VALUES (%s); INSERT INTO foo VALUES (%s)",
... (10, 20))
Warning
If a statements must be executed outside a transaction (such as
CREATE
DATABASE
), it cannot be executed in batch with other
statements, even if the connection is in autocommit mode:
>>> conn.autocommit = True
>>> conn.execute("CREATE DATABASE foo; SELECT 1")
Traceback (most recent call last):
...
psycopg.errors.ActiveSqlTransaction: CREATE DATABASE cannot run inside a transaction block
This happens because PostgreSQL itself will wrap multiple statements in a transaction. Note that your will experience a different behaviour in psql ( psql will split the queries on semicolons and send them to the server separately).
This is not new in Psycopg 3: the same limitation is present in
psycopg2
too.
Multiple results returned from multiple statements #
If more than one statement returning results is executed in psycopg2, only the result of the last statement is returned:
>>> cur_pg2.execute("SELECT 1; SELECT 2")
>>> cur_pg2.fetchone()
(2,)
In Psycopg 3 instead, all the results are available. After running the query,
the first result will be readily available in the cursor and can be consumed
using the usual
fetch*()
methods. In order to access the following
results, you can use the
Cursor.nextset()
method:
>>> cur_pg3.execute("SELECT 1; SELECT 2")
>>> cur_pg3.fetchone()
(1,)
>>> cur_pg3.nextset()
True
>>> cur_pg3.fetchone()
(2,)
>>> cur_pg3.nextset()
None # no more results
Remember though that you cannot use server-side bindings to execute more than one statement in the same query .
Different cast rules #
In rare cases, especially around variadic functions, PostgreSQL might fail to find a function candidate for the given data types:
>>> conn.execute("SELECT json_build_array(%s, %s)", ["foo", "bar"])
Traceback (most recent call last):
...
psycopg.errors.IndeterminateDatatype: could not determine data type of parameter $1
This can be worked around specifying the argument types explicitly via a cast:
>>> conn.execute("SELECT json_build_array(%s::text, %s::text)", ["foo", "bar"])
You cannot use
IN
%s
with a tuple
#
IN
cannot be used with a tuple as single parameter, as was possible with
psycopg2
:
>>> conn.execute("SELECT * FROM foo WHERE id IN %s", [(10,20,30)])
Traceback (most recent call last):
...
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SELECT * FROM foo WHERE id IN $1
^
What you can do is to use the = ANY() construct and pass the candidate values as a list instead of a tuple, which will be adapted to a PostgreSQL array:
>>> conn.execute("SELECT * FROM foo WHERE id = ANY(%s)", [[10,20,30]])
Note that
ANY()
can be used with
psycopg2
too, and has the advantage of
accepting an empty list of values too as argument, which is not supported by
the
IN
operator instead.
Different adaptation system #
The adaptation system has been completely rewritten, in order to address server-side parameters adaptation, but also to consider performance, flexibility, ease of customization.
The default behaviour with builtin data should be what you would expect . If you have customised the way to adapt data, or if you are managing your own extension types, you should look at the new adaptation system .
See also
-
Adapting basic Python types for the basic behaviour.
-
Data adaptation configuration for more advanced use.
Copy is no longer file-based #
psycopg2
exposes
a few copy methods
to interact with
PostgreSQL
COPY
. Their file-based interface doesn’t make it easy to load
dynamically-generated data into a database.
There is now a single
copy()
method, which is similar to
psycopg2
copy_expert()
in accepting a free-form
COPY
command and
returns an object to read/write data, block-wise or record-wise. The different
usage pattern also enables
COPY
to be used in async interactions.
See also
See Using COPY TO and COPY FROM for the details.
with
connection
#
In
psycopg2
, using the syntax
with connection
,
only the transaction is closed, not the connection. This behaviour is
surprising for people used to several other Python classes wrapping resources,
such as files.
In Psycopg 3, using
with connection
will close the
connection at the end of the
with
block, making handling the connection
resources more familiar.
In order to manage transactions as blocks you can use the
Connection.transaction()
method, which allows for finer control, for
instance to use nested transactions.
See also
See Transaction contexts for details.
callproc()
is gone
#
cursor.callproc()
is not implemented. The method has a simplistic semantic
which doesn’t account for PostgreSQL positional parameters, procedures,
set-returning functions… Use a normal
execute()
with
SELECT
function_name(...)
or
CALL
procedure_name(...)
instead.
client_encoding
is gone
#
Psycopg automatically uses the database client encoding to decode data to
Unicode strings. Use
ConnectionInfo.encoding
if you need to read the
encoding. You can select an encoding at connection time using the
client_encoding
connection parameter and you can change the encoding of a
connection by running a
SET
client_encoding
statement… But why would
you?
No default infinity dates handling #
PostgreSQL can represent a much wider range of dates and timestamps than
Python. While Python dates are limited to the years between 1 and 9999
(represented by constants such as
datetime.date.min
and
max
), PostgreSQL dates extend to BC dates and past the year
10K. Furthermore PostgreSQL can also represent symbolic dates "infinity", in
both directions.
In psycopg2, by default,
infinity dates and timestamps map to ‘date.max’
and similar constants. This has the problem of creating a non-bijective
mapping (two Postgres dates, infinity and 9999-12-31, both map to the same
Python date). There is also the perversity that valid Postgres dates, greater
than Python
date.max
but arguably lesser than infinity, will still
overflow.
In Psycopg 3, every date greater than year 9999 will overflow, including
infinity. If you would like to customize this mapping (for instance flattening
every date past Y10K on
date.max
) you can subclass and adapt the
appropriate loaders: take a look at
this example
to see how.
What’s new in Psycopg 3 #
More advanced topics #
Once you have familiarised yourself with the Psycopg basic operations , you can take a look at the chapter of this section for more advanced usages.
Asynchronous operations #
Psycopg
Connection
and
Cursor
have counterparts
AsyncConnection
and
AsyncCursor
supporting an
asyncio
interface.
The design of the asynchronous objects is pretty much the same of the sync
ones: in order to use them you will only have to scatter the
await
keyword
here and there.
async with await psycopg.AsyncConnection.connect(
"dbname=test user=postgres") as aconn:
async with aconn.cursor() as acur:
await acur.execute(
"INSERT INTO test (num, data) VALUES (%s, %s)",
(100, "abc'def"))
await acur.execute("SELECT * FROM test")
await acur.fetchone()
# will return (1, 100, "abc'def")
async for record in acur:
print(record)
Changed in version 3.1:
AsyncConnection.connect()
performs DNS name resolution in a non-blocking
way.
Warning
Before version 3.1,
AsyncConnection.connect()
may still block on DNS
name resolution. To avoid that you should
set the hostaddr connection
parameter
, or use the
resolve_hostaddr_async()
to
do it automatically.
Warning
On Windows, Psycopg is not compatible with the default
ProactorEventLoop
. Please use a different loop, for instance
the
SelectorEventLoop
.
For instance, you can use, early in your program:
asyncio.set_event_loop_policy
(asyncio.WindowsSelectorEventLoopPolicy
() )
with
async connections
#
As seen in the basic usage , connections and cursors can act as context managers, so you can run:
with psycopg.connect("dbname=test user=postgres") as conn:
with conn.cursor() as cur:
cur.execute(...)
# the cursor is closed upon leaving the context
# the transaction is committed, the connection closed
For asynchronous connections it’s
almost
what you’d expect, but
not quite. Please note that
connect()
and
cursor()
don’t return a context
: they are both factory methods which return
an
object which can be used as a context
. That’s because there are several use
cases where it’s useful to handle the objects manually and only
close()
them
when required.
As a consequence you cannot use
async
with
connect()
: you have to do it in
two steps instead, as in
aconn = await psycopg.AsyncConnection.connect()
async with aconn:
async with aconn.cursor() as cur:
await cur.execute(...)
which can be condensed into
async
with
await
:
async with await psycopg.AsyncConnection.connect() as aconn:
async with aconn.cursor() as cur:
await cur.execute(...)
…but no less than that: you still need to do the double async thing.
Note that the
AsyncConnection.cursor()
function is not an
async
function
(it never performs I/O), so you don’t need an
await
on it; as a consequence
you can use the normal
async
with
context manager.
Interrupting async operations using Ctrl-C #
If a long running operation is interrupted by a Ctrl-C on a normal connection
running in the main thread, the operation will be cancelled and the connection
will be put in error state, from which can be recovered with a normal
rollback()
.
If the query is running in an async connection, a Ctrl-C will be likely
intercepted by the async loop and interrupt the whole program. In order to
emulate what normally happens with blocking connections, you can use
asyncio’s add_signal_handler()
, to call
Connection.cancel()
:
import asyncio
import signal
async with await psycopg.AsyncConnection.connect() as conn:
loop.add_signal_handler(signal.SIGINT, conn.cancel)
...
Server messages #
PostgreSQL can send, together with the query results, informative messages about the operation just performed, such as warnings or debug information. Notices may be raised even if the operations are successful and don’t indicate an error. You are probably familiar with some of them, because they are reported by psql :
$ psql
=# ROLLBACK;
WARNING: there is no transaction in progress
ROLLBACK
Messages can be also sent by the
PL/pgSQL ‘RAISE’ statement
(at a level
lower than EXCEPTION, otherwise the appropriate
DatabaseError
will be
raised). The level of the messages received can be controlled using the
client_min_messages
setting.
By default, the messages received are ignored. If you want to process them on
the client you can use the
Connection.add_notice_handler()
function to
register a function that will be invoked whenever a message is received. The
message is passed to the callback as a
Diagnostic
instance,
containing all the information passed by the server, such as the message text
and the severity. The object is the same found on the
diag
attribute of the errors raised by the server:
>>> import psycopg
>>> def log_notice(diag):
... print(f"The server says: {diag.severity} - {diag.message_primary}")
>>> conn = psycopg.connect(autocommit=True)
>>> conn.add_notice_handler(log_notice)
>>> cur = conn.execute("ROLLBACK")
The server says: WARNING - there is no transaction in progress
>>> print(cur.statusmessage)
ROLLBACK
Warning
The
Diagnostic
object received by the callback should not be used after
the callback function terminates, because its data is deallocated after
the callbacks have been processed. If you need to use the information
later please extract the attributes requested and forward them instead of
forwarding the whole
Diagnostic
object.
Asynchronous notifications #
Psycopg allows asynchronous interaction with other database sessions using the
facilities offered by PostgreSQL commands
LISTEN
and
NOTIFY
. Please
refer to the PostgreSQL documentation for examples about how to use this form
of communication.
Because of the way sessions interact with notifications (see
NOTIFY
documentation), you should keep the connection in
autocommit
mode if you wish to receive or send notifications in a timely manner.
Notifications are received as instances of
Notify
. If you are reserving a
connection only to receive notifications, the simplest way is to consume the
Connection.notifies
generator. The generator can be stopped using
close()
.
Note
You don’t need an
AsyncConnection
to handle notifications: a normal
blocking
Connection
is perfectly valid.
The following example will print notifications and stop when one containing
the
"stop"
message is received.
import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
print(notify)
if notify.payload == "stop":
gen.close()
print("there, I stopped")
If you run some
NOTIFY
in a
psql
session:
=# NOTIFY mychan, 'hello';
NOTIFY
=# NOTIFY mychan, 'hey';
NOTIFY
=# NOTIFY mychan, 'stop';
NOTIFY
You may get output from the Python process such as:
Notify(channel='mychan', payload='hello', pid=961823)
Notify(channel='mychan', payload='hey', pid=961823)
Notify(channel='mychan', payload='stop', pid=961823)
there, I stopped
Alternatively, you can use
add_notify_handler()
to register a
callback function, which will be invoked whenever a notification is received,
during the normal query processing; you will be then able to use the
connection normally. Please note that in this case notifications will not be
received immediately, but only during a connection operation, such as a query.
conn.add_notify_handler(lambda n: print(f"got this: {n}"))
# meanwhile in psql...
# =# NOTIFY mychan, 'hey';
# NOTIFY
print(conn.execute("SELECT 1").fetchone())
# got this: Notify(channel='mychan', payload='hey', pid=961823)
# (1,)
Detecting disconnections #
Sometimes it is useful to detect immediately when the connection with the
database is lost. One brutal way to do so is to poll a connection in a loop
running an endless stream of
SELECT
1
…
Don’t
do so: polling is
so
out of fashion. Besides, it is inefficient (unless what you really want is a
client-server generator of ones), it generates useless traffic and will only
detect a disconnection with an average delay of half the polling time.
A more efficient and timely way to detect a server disconnection is to create an additional connection and wait for a notification from the OS that this connection has something to say: only then you can run some checks. You can dedicate a thread (or an asyncio task) to wait on this connection: such thread will perform no activity until awaken by the OS.
In a normal (non asyncio) program you can use the
selectors
module. Because
the
Connection
implements a
fileno()
method you can just
register it as a file-like object. You can run such code in a dedicated thread
(and using a dedicated connection) if the rest of the program happens to have
something else to do too.
import selectors
sel = selectors.DefaultSelector()
sel.register(conn, selectors.EVENT_READ)
while True:
if not sel.select(timeout=60.0):
continue # No FD activity detected in one minute
# Activity detected. Is the connection still ok?
try:
conn.execute("SELECT 1")
except psycopg.OperationalError:
# You were disconnected: do something useful such as panicking
logger.error("we lost our database!")
sys.exit(1)
In an
asyncio
program you can dedicate a
Task
instead and do
something similar using
add_reader
:
import asyncio
ev = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_reader(conn.fileno(), ev.set)
while True:
try:
await asyncio.wait_for(ev.wait(), 60.0)
except asyncio.TimeoutError:
continue # No FD activity detected in one minute
# Activity detected. Is the connection still ok?
try:
await conn.execute("SELECT 1")
except psycopg.OperationalError:
# Guess what happened
...
Static Typing #
Psycopg source code is annotated according to
PEP 0484
type hints and is
checked using the current version of
Mypy
in
--strict
mode.
If your application is checked using Mypy too you can make use of Psycopg types to validate the correct use of Psycopg objects and of the data returned by the database.
Generic types #
Psycopg
Connection
and
Cursor
objects are
Generic
objects and
support a
Row
parameter which is the type of the records returned.
By default methods such as
Cursor.fetchall()
return normal tuples of unknown
size and content. As such, the
connect()
function returns an object of type
psycopg.Connection[Tuple[Any,
...]]
and
Connection.cursor()
returns an
object of type
psycopg.Cursor[Tuple[Any,
...]]
. If you are writing generic
plumbing code it might be practical to use annotations such as
Connection[Any]
and
Cursor[Any]
.
conn = psycopg.connect() # type is psycopg.Connection[Tuple[Any, ...]]
cur = conn.cursor() # type is psycopg.Cursor[Tuple[Any, ...]]
rec = cur.fetchone() # type is Optional[Tuple[Any, ...]]
recs = cur.fetchall() # type is List[Tuple[Any, ...]]
Type of rows returned #
If you want to use connections and cursors returning your data as different
types, for instance as dictionaries, you can use the
row_factory
argument
of the
connect()
and the
cursor()
method, which
will control what type of record is returned by the fetch methods of the
cursors and annotate the returned objects accordingly. See
Row factories
for more details.
dconn = psycopg.connect(row_factory=dict_row)
# dconn type is psycopg.Connection[Dict[str, Any]]
dcur = conn.cursor(row_factory=dict_row)
dcur = dconn.cursor()
# dcur type is psycopg.Cursor[Dict[str, Any]] in both cases
drec = dcur.fetchone()
# drec type is Optional[Dict[str, Any]]
Example: returning records as Pydantic models #
Using Pydantic it is possible to enforce static typing at runtime. Using a Pydantic model factory the code can be checked statically using Mypy and querying the database will raise an exception if the rows returned is not compatible with the model.
The following example can be checked with
mypy
--strict
without reporting
any issue. Pydantic will also raise a runtime error in case the
Person
is used with a query that returns incompatible data.
from datetime import date
from typing import Optional
import psycopg
from psycopg.rows import class_row
from pydantic import BaseModel
class Person(BaseModel):
id: int
first_name: str
last_name: str
dob: Optional[date]
def fetch_person(id: int) -> Person:
with psycopg.connect() as conn:
with conn.cursor(row_factory=class_row(Person)) as cur:
cur.execute(
"""
SELECT id, first_name, last_name, dob
FROM (VALUES
(1, 'John', 'Doe', '2000-01-01'::date),
(2, 'Jane', 'White', NULL)
) AS data (id, first_name, last_name, dob)
WHERE id = %(id)s;
""",
{"id": id},
)
obj = cur.fetchone()
# reveal_type(obj) would return 'Optional[Person]' here
if not obj:
raise KeyError(f"person {id} not found")
# reveal_type(obj) would return 'Person' here
return obj
for id in [1, 2]:
p = fetch_person(id)
if p.dob:
print(f"{p.first_name} was born in {p.dob.year}")
else:
print(f"Who knows when {p.first_name} was born")
Checking literal strings in queries #
The
execute()
method and similar should only receive a literal
string as input, according to
PEP 675
. This means that the query should
come from a literal string in your code, not from an arbitrary string
expression.
For instance, passing an argument to the query should be done via the second
argument to
execute()
, not by string composition:
def get_record(conn: psycopg.Connection[Any], id: int) -> Any:
cur = conn.execute("SELECT * FROM my_table WHERE id = %s" % id) # BAD!
return cur.fetchone()
# the function should be implemented as:
def get_record(conn: psycopg.Connection[Any], id: int) -> Any:
cur = conn.execute("select * FROM my_table WHERE id = %s", (id,))
return cur.fetchone()
If you are composing a query dynamically you should use the
sql.SQL
object
and similar to escape safely table and field names. The parameter of the
SQL()
object should be a literal string:
def count_records(conn: psycopg.Connection[Any], table: str) -> int:
query = "SELECT count(*) FROM %s" % table # BAD!
return conn.execute(query).fetchone()[0]
# the function should be implemented as:
def count_records(conn: psycopg.Connection[Any], table: str) -> int:
query = sql.SQL("SELECT count(*) FROM {}").format(sql.Identifier(table))
return conn.execute(query).fetchone()[0]
At the time of writing, no Python static analyzer implements this check ( mypy doesn’t implement it , Pyre does, but doesn’t work with psycopg yet ). Once the type checkers support will be complete, the above bad statements should be reported as errors.
Row factories #
Cursor’s
fetch*
methods, by default, return the records received from the
database as tuples. This can be changed to better suit the needs of the
programmer by using custom
row factories
.
The module
psycopg.rows
exposes several row factories ready to be used. For
instance, if you want to return your records as dictionaries, you can use
dict_row
:
>>> from psycopg.rows import dict_row
>>> conn = psycopg.connect(DSN, row_factory=dict_row)
>>> conn.execute("select 'John Doe' as name, 33 as age").fetchone()
{'name': 'John Doe', 'age': 33}
The
row_factory
parameter is supported by the
connect()
method and the
cursor()
method. Later usage of
row_factory
overrides a previous one. It is also possible to change the
Connection.row_factory
or
Cursor.row_factory
attributes to change what
they return:
>>> cur = conn.cursor(row_factory=dict_row)
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
{'name': 'John Doe', 'age': 33}
>>> from psycopg.rows import namedtuple_row
>>> cur.row_factory = namedtuple_row
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
Row(name='John Doe', age=33)
If you want to return objects of your choice you can use a row factory
generator
, for instance
class_row
or
args_row
, or you can
write your own row factory
:
>>> from dataclasses import dataclass
>>> @dataclass
... class Person:
... name: str
... age: int
... weight: Optional[int] = None
>>> from psycopg.rows import class_row
>>> cur = conn.cursor(row_factory=class_row(Person))
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
Person(name='John Doe', age=33, weight=None)
Creating new row factories #
A
row factory
is a callable that accepts a
Cursor
object and returns
another callable, a
row maker
, which takes raw data (as a sequence of
values) and returns the desired object.
The role of the row factory is to inspect a query result (it is called after a
query is executed and properties such as
description
and
pgresult
are available on the cursor) and to prepare a callable
which is efficient to call repeatedly (because, for instance, the names of the
columns are extracted, sanitised, and stored in local variables).
Formally, these objects are represented by the
RowFactory
and
RowMaker
protocols.
RowFactory
objects can be implemented as a class, for instance:
from typing import Any, Sequence
from psycopg import Cursor
class DictRowFactory:
def __init__(self, cursor: Cursor[Any]):
self.fields = [c.name for c in cursor.description]
def __call__(self, values: Sequence[Any]) -> dict[str, Any]:
return dict(zip(self.fields, values))
or as a plain function:
def dict_row_factory(cursor: Cursor[Any]) -> RowMaker[dict[str, Any]]:
fields = [c.name for c in cursor.description]
def make_row(values: Sequence[Any]) -> dict[str, Any]:
return dict(zip(fields, values))
return make_row
These can then be used by specifying a
row_factory
argument in
Connection.connect()
,
Connection.cursor()
, or by setting the
Connection.row_factory
attribute.
conn = psycopg.connect(row_factory=DictRowFactory)
cur = conn.execute("SELECT first_name, last_name, age FROM persons")
person = cur.fetchone()
print(f"{person['first_name']} {person['last_name']}")
Connection pools #
A connection pool is an object managing a set of connections and allowing their use in functions needing one. Because the time to establish a new connection can be relatively long, keeping connections open can reduce latency.
This page explains a few basic concepts of Psycopg connection pool’s
behaviour. Please refer to the
ConnectionPool
object API for details about
the pool operations.
Note
The connection pool objects are distributed in a package separate
from the main
psycopg
package: use
pip
install
"psycopg[pool]"
or
pip
install
psycopg_pool
to make the
psycopg_pool
package available. See
Installing the connection pool
.
Pool life cycle #
A simple way to use the pool is to create a single instance of it, as a global object, and to use this object in the rest of the program, allowing other functions, modules, threads to use it:
# module db.py in your program
from psycopg_pool import ConnectionPool
pool = ConnectionPool(conninfo, **kwargs)
# the pool starts connecting immediately.
# in another module
from .db import pool
def my_function():
with pool.connection() as conn:
conn.execute(...)
Ideally you may want to call
close()
when the use of the
pool is finished. Failing to call
close()
at the end of the program is not
terribly bad: probably it will just result in some warnings printed on stderr.
However, if you think that it’s sloppy, you could use the
atexit
module to
have
close()
called at the end of the program.
If you want to avoid starting to connect to the database at import time, and
want to wait for the application to be ready, you can create the pool using
open=False
, and call the
open()
and
close()
methods when the conditions are right. Certain
frameworks provide callbacks triggered when the program is started and stopped
(for instance
FastAPI startup/shutdown events
): they are perfect to
initiate and terminate the pool operations:
pool = ConnectionPool(conninfo, open=False, **kwargs)
@app.on_event("startup")
def open_pool():
pool.open()
@app.on_event("shutdown")
def close_pool():
pool.close()
Creating a single pool as a global variable is not the mandatory use: your program can create more than one pool, which might be useful to connect to more than one database, or to provide different types of connections, for instance to provide separate read/write and read-only connections. The pool also acts as a context manager and is open and closed, if necessary, on entering and exiting the context block:
from psycopg_pool import ConnectionPool
with ConnectionPool(conninfo, **kwargs) as pool:
run_app(pool)
# the pool is now closed
When the pool is open, the pool’s background workers start creating the
requested
min_size
connections, while the constructor (or the
open()
method) returns immediately. This allows the program some leeway to start
before the target database is up and running. However, if your application is
misconfigured, or the network is down, it means that the program will be able
to start, but the threads requesting a connection will fail with a
PoolTimeout
only after the timeout on
connection()
is
expired. If this behaviour is not desirable (and you prefer your program to
crash hard and fast, if the surrounding conditions are not right, because
something else will respawn it) you should call the
wait()
method after creating the pool, or call
open(wait=True)
: these methods will
block until the pool is full, or will raise a
PoolTimeout
exception if the
pool isn’t ready within the allocated time.
Connections life cycle #
The pool background workers create connections according to the parameters
conninfo
,
kwargs
, and
connection_class
passed to
ConnectionPool
constructor, invoking something like
connection_class
(
conninfo
,
**
kwargs
)
. Once a connection is created it is also passed to the
configure()
callback, if provided, after which it is put in the pool (or
passed to a client requesting it, if someone is already knocking at the door).
If a connection expires (it passes
max_lifetime
), or is returned to the pool
in broken state, or is found closed by
check()
), then the
pool will dispose of it and will start a new connection attempt in the
background.
Using connections from the pool #
The pool can be used to request connections from multiple threads or
concurrent tasks - it is hardly useful otherwise! If more connections than the
ones available in the pool are requested, the requesting threads are queued
and are served a connection as soon as one is available, either because
another client has finished using it or because the pool is allowed to grow
(when
max_size
>
min_size
) and a new connection is ready.
The main way to use the pool is to obtain a connection using the
connection()
context, which returns a
Connection
or subclass:
with my_pool.connection() as conn:
conn.execute("what you want")
The
connection()
context behaves like the
Connection
object
context: at the end of the block, if there is a transaction open, it will be
committed, or rolled back if the context is exited with as exception.
At the end of the block the connection is returned to the pool and shouldn’t
be used anymore by the code which obtained it. If a
reset()
function is
specified in the pool constructor, it is called on the connection before
returning it to the pool. Note that the
reset()
function is called in a
worker thread, so that the thread which used the connection can keep its
execution without being slowed down by it.
Pool connection and sizing #
A pool can have a fixed size (specifying no
max_size
or
max_size
=
min_size
) or a dynamic size (when
max_size
>
min_size
). In both
cases, as soon as the pool is created, it will try to acquire
min_size
connections in the background.
If an attempt to create a connection fails, a new attempt will be made soon
after, using an exponential backoff to increase the time between attempts,
until a maximum of
reconnect_timeout
is reached. When that happens, the pool
will call the
reconnect_failed()
function, if provided to the pool, and just
start a new connection attempt. You can use this function either to send
alerts or to interrupt the program and allow the rest of your infrastructure
to restart it.
If more than
min_size
connections are requested concurrently, new ones are
created, up to
max_size
. Note that the connections are always created by the
background workers, not by the thread asking for the connection: if a client
requests a new connection, and a previous client terminates its job before the
new connection is ready, the waiting client will be served the existing
connection. This is especially useful in scenarios where the time to establish
a connection dominates the time for which the connection is used (see
this
analysis
, for instance).
If a pool grows above
min_size
, but its usage decreases afterwards, a number
of connections are eventually closed: one every time a connection is unused
after the
max_idle
time specified in the pool constructor.
What’s the right size for the pool? #
Big question. Who knows. However, probably not as large as you imagine. Please take a look at this analysis for some ideas.
Something useful you can do is probably to use the
get_stats()
method and monitor the behaviour of your program
to tune the configuration parameters. The size of the pool can also be changed
at runtime using the
resize()
method.
Null connection pools #
New in version 3.1.
Sometimes you may want leave the choice of using or not using a connection pool as a configuration parameter of your application. For instance, you might want to use a pool if you are deploying a "large instance" of your application and can dedicate it a handful of connections; conversely you might not want to use it if you deploy the application in several instances, behind a load balancer, and/or using an external connection pool process such as PgBouncer.
Switching between using or not using a pool requires some code change, because
the
ConnectionPool
API is different from the normal
connect()
function and because the pool can perform additional connection configuration
(in the
configure
parameter) that, if the pool is removed, should be
performed in some different code path of your application.
The
psycopg_pool
3.1 package introduces the
NullConnectionPool
class.
This class has the same interface, and largely the same behaviour, of the
ConnectionPool
, but doesn’t create any connection beforehand. When a
connection is returned, unless there are other clients already waiting, it
is closed immediately and not kept in the pool state.
A null pool is not only a configuration convenience, but can also be used to
regulate the access to the server by a client program. If
max_size
is set to
a value greater than 0, the pool will make sure that no more than
max_size
connections are created at any given time. If more clients ask for further
connections, they will be queued and served a connection as soon as a previous
client has finished using it, like for the basic pool. Other mechanisms to
throttle client requests (such as
timeout
or
max_waiting
) are respected
too.
Note
Queued clients will be handed an already established connection, as soon
as a previous client has finished using it (and after the pool has
returned it to idle state and called
reset()
on it, if necessary).
Because normally (i.e. unless queued) every client will be served a new connection, the time to obtain the connection is paid by the waiting client; background workers are not normally involved in obtaining new connections.
Connection quality #
The state of the connection is verified when a connection is returned to the pool: if a connection is broken during its usage it will be discarded on return and a new connection will be created.
Warning
The health of the connection is not checked when the pool gives it to a client.
Why not? Because doing so would require an extra network roundtrip: we want to save you from its latency. Before getting too angry about it, just think that the connection can be lost any moment while your program is using it. As your program should already be able to cope with a loss of a connection during its process, it should be able to tolerate to be served a broken connection: unpleasant but not the end of the world.
Warning
The health of the connection is not checked when the connection is in the pool.
Does the pool keep a watchful eye on the quality of the connections inside it? No, it doesn’t. Why not? Because you will do it for us! Your program is only a big ruse to make sure the connections are still alive…
Not (entirely) trolling: if you are using a connection pool, we assume that you are using and returning connections at a good pace. If the pool had to check for the quality of a broken connection before your program notices it, it should be polling each connection even faster than your program uses them. Your database server wouldn’t be amused…
Can you do something better than that? Of course you can, there is always a
better way than polling. You can use the same recipe of
Detecting disconnections
,
reserving a connection and using a thread to monitor for any activity
happening on it. If any activity is detected, you can call the pool
check()
method, which will run a quick check on each
connection in the pool, removing the ones found in broken state, and using the
background workers to replace them with fresh ones.
If you set up a similar check in your program, in case the database connection
is temporarily lost, we cannot do anything for the threads which had taken
already a connection from the pool, but no other thread should be served a
broken connection, because
check()
would empty the pool and refill it with
working connections, as soon as they are available.
Faster than you can say poll. Or pool.
Pool stats #
The pool can return information about its usage using the methods
get_stats()
or
pop_stats()
. Both methods
return the same values, but the latter reset the counters after its use. The
values can be sent to a monitoring system such as
Graphite
or
Prometheus
.
The following values should be provided, but please don’t consider them as a rigid interface: it is possible that they might change in the future. Keys whose value is 0 may not be returned.
Metric |
Meaning |
---|---|
|
Current value for
|
|
Current value for
|
|
Number of connections currently managed by the pool (in the pool, given to clients, being prepared) |
|
Number of connections currently idle in the pool |
|
Number of requests currently waiting in a queue to receive a connection |
|
Total usage time of the connections outside the pool |
|
Number of connections requested to the pool |
|
Number of requests queued because a connection wasn’t immediately available in the pool |
|
Total time in the queue for the clients waiting |
|
Number of connection requests resulting in an error (timeouts, queue full…) |
|
Number of connections returned to the pool in a bad state |
|
Number of connection attempts made by the pool to the server |
|
Total time spent to establish connections with the server |
|
Number of failed connection attempts |
|
Number of connections lost identified by
|
Cursor types #
Psycopg can manage kinds of "cursors" which differ in where the state of a query being processed is stored: Client-side cursors and Server-side cursors .
Client-side cursors #
Client-side cursors are what Psycopg uses in its normal querying process.
They are implemented by the
Cursor
and
AsyncCursor
classes. In such
querying pattern, after a cursor sends a query to the server (usually calling
execute()
), the server replies transferring to the client the whole
set of results requested, which is stored in the state of the same cursor and
from where it can be read from Python code (using methods such as
fetchone()
and siblings).
This querying process is very scalable because, after a query result has been transmitted to the client, the server doesn’t keep any state. Because the results are already in the client memory, iterating its rows is very quick.
The downside of this querying method is that the entire result has to be transmitted completely to the client (with a time proportional to its size) and the client needs enough memory to hold it, so it is only suitable for reasonably small result sets.
Client-side-binding cursors #
New in version 3.1.
The previously described client-side cursors send the query and the parameters separately to the server. This is the most efficient way to process parametrised queries and allows to build several features and optimizations. However, not all types of queries can be bound server-side; in particular no Data Definition Language query can. See Server-side binding for the description of these problems.
The
ClientCursor
(and its
AsyncClientCursor
async counterpart) merge the
query on the client and send the query and the parameters merged together to
the server. This allows to parametrize any type of PostgreSQL statement, not
only queries (
SELECT
) and Data Manipulation statements (
INSERT
,
UPDATE
,
DELETE
).
Using
ClientCursor
, Psycopg 3 behaviour will be more similar to
psycopg2
(which only implements client-side binding) and could be useful to port
Psycopg 2 programs more easily to Psycopg 3. The objects in the
sql
module
allow for greater flexibility (for instance to parametrize a table name too,
not only values); however, for simple cases, a
ClientCursor
could be the
right object.
In order to obtain
ClientCursor
from a connection, you can set its
cursor_factory
(at init time or changing its attribute
afterwards):
from psycopg import connect, ClientCursor
conn = psycopg.connect(DSN, cursor_factory=ClientCursor)
cur = conn.cursor()
#
If you need to create a one-off client-side-binding cursor out of a normal
connection, you can just use the
ClientCursor
class passing the connection
as argument.
conn = psycopg.connect(DSN)
cur = psycopg.ClientCursor(conn)
Warning
Client-side cursors don’t support binary parameters and return values and don’t support prepared statements .
Tip
The best use for client-side binding cursors is probably to port large Psycopg 2 code to Psycopg 3, especially for programs making wide use of Data Definition Language statements.
The
psycopg.sql
module allows for more generic client-side query
composition, to mix client- and server-side parameters binding, and allows
to parametrize tables and fields names too, or entirely generic SQL
snippets.
Server-side cursors #
PostgreSQL has its own concept of cursor too (sometimes also called portal ). When a database cursor is created, the query is not necessarily completely processed: the server might be able to produce results only as they are needed. Only the results requested are transmitted to the client: if the query result is very large but the client only needs the first few records it is possible to transmit only them.
The downside is that the server needs to keep track of the partially processed results, so it uses more memory and resources on the server.
Psycopg allows the use of server-side cursors using the classes
ServerCursor
and
AsyncServerCursor
. They are usually created by passing the
name
parameter to the
cursor()
method (reason for which, in
psycopg2
, they are usually called
named cursors
). The use of these classes
is similar to their client-side counterparts: their interface is the same, but
behind the scene they send commands to control the state of the cursor on the
server (for instance when fetching new records or when moving using
scroll()
).
Using a server-side cursor it is possible to process datasets larger than what would fit in the client’s memory. However for small queries they are less efficient because it takes more commands to receive their result, so you should use them only if you need to process huge results or if only a partial result is needed.
See also
Server-side cursors are created and managed by
ServerCursor
using SQL
commands such as
DECLARE
,
FETCH
,
MOVE
. The PostgreSQL documentation
gives a good idea of what is possible to do with them.
"Stealing" an existing cursor #
A Psycopg
ServerCursor
can be also used to consume a cursor which was
created in other ways than the
DECLARE
that
ServerCursor.execute()
runs behind the scene.
For instance if you have a PL/pgSQL function returning a cursor :
CREATE FUNCTION reffunc(refcursor) RETURNS refcursor AS $$
BEGIN
OPEN $1 FOR SELECT col FROM test;
RETURN $1;
END;
$$ LANGUAGE plpgsql;
you can run a one-off command in the same connection to call it (e.g. using
Connection.execute()
) in order to create the cursor on the server:
conn.execute("SELECT reffunc('curname')")
after which you can create a server-side cursor declared by the same name, and
directly call the fetch methods, skipping the
execute()
call:
cur = conn.cursor('curname')
# no cur.execute()
for record in cur: # or cur.fetchone(), cur.fetchmany()...
# do something with record
Data adaptation configuration #
The adaptation system is at the core of Psycopg and allows to customise the way Python objects are converted to PostgreSQL when a query is performed and how PostgreSQL values are converted to Python objects when query results are returned.
Note
For a high-level view of the conversion of types between Python and PostgreSQL please look at Passing parameters to SQL queries . Using the objects described in this page is useful if you intend to customise the adaptation rules.
-
Adaptation configuration is performed by changing the
adapters
object of objects implementing theAdaptContext
protocol, for instanceConnection
orCursor
. -
Every context object derived from another context inherits its adapters mapping: cursors created from a connection inherit the connection’s configuration.
By default, connections obtain an adapters map from the global map exposed as
psycopg.adapters
: changing the content of this object will affect every connection created afterwards. You may specify a different template adapters map using thecontext
parameter onconnect()
. -
The
adapters
attributes areAdaptersMap
instances, and contain the mapping from Python types andDumper
classes, and from PostgreSQL OIDs toLoader
classes. Changing this mapping (e.g. writing and registering your own adapters, or using a different configuration of builtin adapters) affects how types are converted between Python and PostgreSQL.-
Dumpers (objects implementing the
Dumper
protocol) are the objects used to perform the conversion from a Python object to a bytes sequence in a format understood by PostgreSQL. The string returned shouldn’t be quoted : the value will be passed to the database using functions such asPQexecParams()
so quoting and quotes escaping is not necessary. The dumper usually also suggests to the server what type to use, via itsoid
attribute. -
Loaders (objects implementing the
Loader
protocol) are the objects used to perform the opposite operation: reading a bytes sequence from PostgreSQL and creating a Python object out of it. -
Dumpers and loaders are instantiated on demand by a
Transformer
object when a query is executed.
-
Note
Changing adapters in a context only affects that context and its children objects created afterwards ; the objects already created are not affected. For instance, changing the global context will only change newly created connections, not the ones already existing.
Writing a custom adapter: XML #
Psycopg doesn’t provide adapters for the XML data type, because there are just
too many ways of handling XML in Python. Creating a loader to parse the
PostgreSQL xml type
to
ElementTree
is very simple, using the
psycopg.adapt.Loader
base class and implementing the
load()
method:
>>> import xml.etree.ElementTree as ET
>>> from psycopg.adapt import Loader
>>> # Create a class implementing the `load()` method.
>>> class XmlLoader(Loader):
... def load(self, data):
... return ET.fromstring(data)
>>> # Register the loader on the adapters of a context.
>>> conn.adapters.register_loader("xml", XmlLoader)
>>> # Now just query the database returning XML data.
>>> cur = conn.execute(
... """select XMLPARSE (DOCUMENT '
... Manual ... ')
... """)
>>> elem = cur.fetchone()[0]
>>> elem
The opposite operation, converting Python objects to PostgreSQL, is performed
by dumpers. The
psycopg.adapt.Dumper
base class makes it easy to implement one:
you only need to implement the
dump()
method:
>>> from psycopg.adapt import Dumper
>>> class XmlDumper(Dumper):
... # Setting an OID is not necessary but can be helpful
... oid = psycopg.adapters.types["xml"].oid
...
... def dump(self, elem):
... return ET.tostring(elem)
>>> # Register the dumper on the adapters of a context
>>> conn.adapters.register_dumper(ET.Element, XmlDumper)
>>> # Now, in that context, it is possible to use ET.Element objects as parameters
>>> conn.execute("SELECT xpath('//title/text()', %s)", [elem]).fetchone()[0]
['Manual']
Note that it is possible to use a
TypesRegistry
, exposed by
any
AdaptContext
, to obtain information on builtin types, or
extension types if they have been registered on that context using the
TypeInfo
.
register()
method.
Example: PostgreSQL numeric to Python float #
Normally PostgreSQL
numeric
values are converted to Python
Decimal
instances, because both the types allow fixed-precision
arithmetic and are not subject to rounding.
Sometimes, however, you may want to perform floating-point math on
numeric
values, and
Decimal
may get in the way (maybe because it is
slower, or maybe because mixing
float
and
Decimal
values causes Python
errors).
If you are fine with the potential loss of precision and you simply want to
receive
numeric
values as Python
float
, you can register on
numeric
the same
Loader
class used to load
float4
/
float8
values. Because the PostgreSQL textual
representation of both floats and decimal is the same, the two loaders are
compatible.
conn = psycopg.connect()
conn.execute("SELECT 123.45").fetchone()[0]
# Decimal('123.45')
conn.adapters.register_loader("numeric", psycopg.types.numeric.FloatLoader)
conn.execute("SELECT 123.45").fetchone()[0]
# 123.45
In this example the customised adaptation takes effect only on the connection
conn
and on any cursor created from it, not on other connections.
Example: handling infinity date #
Suppose you want to work with the "infinity" date which is available in PostgreSQL but not handled by Python:
>>> conn.execute("SELECT 'infinity'::date").fetchone()
Traceback (most recent call last):
...
DataError: date too large (after year 10K): 'infinity'
One possibility would be to store Python’s
datetime.date.max
as PostgreSQL
infinity. For this, let’s create a subclass for the dumper and the loader and
register them in the working scope (globally or just on a connection or
cursor):
from datetime import date
# Subclass existing adapters so that the base case is handled normally.
from psycopg.types.datetime import DateLoader, DateDumper
class InfDateDumper(DateDumper):
def dump(self, obj):
if obj == date.max:
return b"infinity"
elif obj == date.min:
return b"-infinity"
else:
return super().dump(obj)
class InfDateLoader(DateLoader):
def load(self, data):
if data == b"infinity":
return date.max
elif data == b"-infinity":
return date.min
else:
return super().load(data)
# The new classes can be registered globally, on a connection, on a cursor
cur.adapters.register_dumper(date, InfDateDumper)
cur.adapters.register_loader("date", InfDateLoader)
cur.execute("SELECT %s::text, %s::text", [date(2020, 12, 31), date.max]).fetchone()
# ('2020-12-31', 'infinity')
cur.execute("SELECT '2020-12-31'::date, 'infinity'::date").fetchone()
# (datetime.date(2020, 12, 31), datetime.date(9999, 12, 31))
Dumpers and loaders life cycle #
Registering dumpers and loaders will instruct Psycopg to use them in the queries to follow, in the context where they have been registered.
When a query is performed on a
Cursor
, a
Transformer
object is created as a local context to manage
adaptation during the query, instantiating the required dumpers and loaders
and dispatching the values to perform the wanted conversions from Python to
Postgres and back.
-
The
Transformer
copies the adapters configuration from theCursor
, thus inheriting all the changes made to the globalpsycopg.adapters
configuration, the currentConnection
, theCursor
. -
For every Python type passed as query argument, the
Transformer
will instantiate aDumper
. Usually all the objects of the same type will be converted by the same dumper instance.-
According to the placeholder used (
%s
,%b
,%t
), Psycopg may pick a binary or a text dumper. When using the%s
"AUTO
" format, if the same type has both a text and a binary dumper registered, the last one registered byregister_dumper()
will be used. -
Sometimes, just looking at the Python type is not enough to decide the best PostgreSQL type to use (for instance the PostgreSQL type of a Python list depends on the objects it contains, whether to use an
integer
orbigint
depends on the number size…) In these cases the mechanism provided byget_key()
andupgrade()
is used to create more specific dumpers.
-
-
The query is executed. Upon successful request, the result is received as a
PGresult
. -
For every OID returned by the query, the
Transformer
will instantiate aLoader
. All the values with the same OID will be converted by the same loader instance. -
Recursive types (e.g. Python lists, PostgreSQL arrays and composite types) will use the same adaptation rules.
As a consequence it is possible to perform certain choices only once per query (e.g. looking up the connection encoding) and then call a fast-path operation for each value to convert.
Querying will fail if a Python object for which there isn’t a
Dumper
registered (for the right
Format
) is used as query parameter.
If the query returns a data type whose OID doesn’t have a
Loader
, the
value will be returned as a string (or bytes string for binary types).
Prepared statements #
Psycopg uses an automatic system to manage prepared statements . When a query is prepared, its parsing and planning is stored in the server session, so that further executions of the same query on the same connection (even with different parameters) are optimised.
A query is prepared automatically after it is executed more than
prepare_threshold
times on a connection.
psycopg
will make
sure that no more than
prepared_max
statements are planned: if
further queries are executed, the least recently used ones are deallocated and
the associated resources freed.
Statement preparation can be controlled in several ways:
-
You can decide to prepare a query immediately by passing
prepare=True
toConnection.execute()
orCursor.execute()
. The query is prepared, if it wasn’t already, and executed as prepared from its first use. -
Conversely, passing
prepare=False
toexecute()
will avoid to prepare the query, regardless of the number of times it is executed. The default for the parameter isNone
, meaning that the query is prepared if the conditions described above are met. -
You can disable the use of prepared statements on a connection by setting its
prepare_threshold
attribute toNone
.
Changed in version 3.1:
You can set
prepare_threshold
as a
connect()
keyword
parameter too.
See also
The PREPARE PostgreSQL documentation contains plenty of details about prepared statements in PostgreSQL.
Note however that Psycopg doesn’t use SQL statements such as
PREPARE
and
EXECUTE
, but protocol level commands such as the
ones exposed by
PQsendPrepare
,
PQsendQueryPrepared
.
Warning
Using external connection poolers, such as PgBouncer, is not compatible
with prepared statements, because the same client connection may change
the server session it refers to. If such middleware is used you should
disable prepared statements, by setting the
Connection.prepare_threshold
attribute to
None
.
Pipeline mode support #
New in version 3.1.
The pipeline mode allows PostgreSQL client applications to send a query without having to read the result of the previously sent query. Taking advantage of the pipeline mode, a client will wait less for the server, since multiple queries/results can be sent/received in a single network roundtrip. Pipeline mode can provide a significant performance boost to the application.
Pipeline mode is most useful when the server is distant, i.e., network latency ("ping time") is high, and also when many small operations are being performed in rapid succession. There is usually less benefit in using pipelined commands when each query takes many multiples of the client/server round-trip time to execute. A 100-statement operation run on a server 300 ms round-trip-time away would take 30 seconds in network latency alone without pipelining; with pipelining it may spend as little as 0.3 s waiting for results from the server.
The server executes statements, and returns results, in the order the client sends them. The server will begin executing the commands in the pipeline immediately, not waiting for the end of the pipeline. Note that results are buffered on the server side; the server flushes that buffer when a synchronization point is established.
See also
The PostgreSQL documentation about:
contains many details around when it is most useful to use the pipeline mode and about errors management and interaction with transactions.
Client-server messages flow #
In order to understand better how the pipeline mode works, we should take a closer look at the PostgreSQL client-server message flow .
During normal querying, each statement is transmitted by the client to the server as a stream of request messages, terminating with a Sync message to tell it that it should process the messages sent so far. The server will execute the statement and describe the results back as a stream of messages, terminating with a ReadyForQuery , telling the client that it may now send a new query.
For example, the statement (returning no result):
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
results in the following two groups of messages:
Direction |
Message |
---|---|
Python â–¶ PostgreSQL |
|
PostgreSQL â—€ Python |
|
and the query:
conn.execute("SELECT data FROM mytable WHERE id = %s", [1])
results in the two groups of messages:
Direction |
Message |
---|---|
Python â–¶ PostgreSQL |
|
PostgreSQL â—€ Python |
|
The two statements, sent consecutively, pay the communication overhead four times, once per leg.
The pipeline mode allows the client to combine several operations in longer streams of messages to the server, then to receive more than one response in a single batch. If we execute the two operations above in a pipeline:
with conn.pipeline():
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
conn.execute("SELECT data FROM mytable WHERE id = %s", [1])
they will result in a single roundtrip between the client and the server:
Direction |
Message |
---|---|
Python â–¶ PostgreSQL |
|
PostgreSQL â—€ Python |
|
Pipeline mode usage #
Psycopg supports the pipeline mode via the
Connection.pipeline()
method. The
method is a context manager: entering the
with
block yields a
Pipeline
object. At the end of block, the connection resumes the normal operation mode.
Within the pipeline block, you can use normally one or more cursors to execute
several operations, using
Connection.execute()
,
Cursor.execute()
and
executemany()
.
>>> with conn.pipeline():
... conn.execute("INSERT INTO mytable VALUES (%s)", ["hello"])
... with conn.cursor() as cur:
... cur.execute("INSERT INTO othertable VALUES (%s)", ["world"])
... cur.executemany(
... "INSERT INTO elsewhere VALUES (%s)",
... [("one",), ("two",), ("four",)])
Unlike in normal mode, Psycopg will not wait for the server to receive the result of each query; the client will receive results in batches when the server flushes it output buffer.
When a flush (or a sync) is performed, all pending results are sent back to
the cursors which executed them. If a cursor had run more than one query, it
will receive more than one result; results after the first will be available,
in their execution order, using
nextset()
:
>>> with conn.pipeline():
... with conn.cursor() as cur:
... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["hello"])
... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["world"])
... while True:
... print(cur.fetchall())
... if not cur.nextset():
... break
[(1, 'hello')]
[(2, 'world')]
If any statement encounters an error, the server aborts the current
transaction and will not execute any subsequent command in the queue until the
next
synchronization point
; a
PipelineAborted
exception is raised for each such command. Query processing resumes after the
synchronization point.
Warning
Certain features are not available in pipeline mode, including:
-
COPY is not supported in pipeline mode by PostgreSQL.
-
Cursor.stream()
doesn’t make sense in pipeline mode (its job is the opposite of batching!) -
ServerCursor
are currently not implemented in pipeline mode.
Note
Starting from Psycopg 3.1,
executemany()
makes use internally of
the pipeline mode; as a consequence there is no need to handle a pipeline
block just to call
executemany()
once.
Synchronization points #
Flushing query results to the client can happen either when a synchronization point is established by Psycopg:
-
using the
Pipeline.sync()
method; -
on
Connection.commit()
orrollback()
; -
at the end of a
Pipeline
block; -
possibly when opening a nested
Pipeline
block; -
using a fetch method such as
Cursor.fetchone()
(which only flushes the query but doesn’t issue a Sync and doesn’t reset a pipeline state error).
The server might perform a flush on its own initiative, for instance when the output buffer is full.
Note that, even in autocommit , the server wraps the statements sent in pipeline mode in an implicit transaction, which will be only committed when the Sync is received. As such, a failure in a group of statements will probably invalidate the effect of statements executed after the previous Sync, and will propagate to the following Sync.
For example, in the following block:
>>> with psycopg.connect(autocommit=True) as conn:
... with conn.pipeline() as p, conn.cursor() as cur:
... try:
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"])
... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"])
... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"])
... p.sync()
... except psycopg.errors.UndefinedTable:
... pass
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"])
there will be an error in the block,
relation
"no_such_table"
does
not
exist
caused by the insert
two
, but probably raised by the
sync()
call. At at the end of the block, the table will contain:
=# SELECT * FROM mytable;
+----+------+
id data
+----+------+
2 four
+----+------+
(1 row)
because:
-
the value 1 of the sequence is consumed by the statement
one
, but the record discarded because of the error in the same implicit transaction; -
the statement
three
is not executed because the pipeline is aborted (so it doesn’t consume a sequence item); -
the statement
four
is executed with success after the Sync has terminated the failed transaction.
Warning
The exact Python statement where an exception caused by a server error is raised is somewhat arbitrary: it depends on when the server flushes its buffered result.
If you want to make sure that a group of statements is applied atomically
by the server, do make use of transaction methods such as
commit()
or
transaction()
: these methods will
also sync the pipeline and raise an exception if there was any error in
the commands executed so far.
The fine prints #
Warning
The Pipeline mode is an experimental feature.
Its behaviour, especially around error conditions and concurrency, hasn’t been explored as much as the normal request-response messages pattern, and its async nature makes it inherently more complex.
As we gain more experience and feedback (which is welcome), we might find bugs and shortcomings forcing us to change the current interface or behaviour.
The pipeline mode is available on any currently supported PostgreSQL version,
but, in order to make use of it, the client must use a libpq from PostgreSQL
14 or higher. You can use
Pipeline.is_supported()
to make sure your client
has the right library.
Psycopg 3 API #
This sections is a reference for all the public objects exposed by the
psycopg
module. For a more conceptual description you can take a look at
Getting started with Psycopg 3
and
More advanced topics
.
The
psycopg
module
#
Psycopg implements the Python Database DB API 2.0 specification . As such it also exposes the module-level objects required by the specifications.
- psycopg. connect ( conninfo = '' , * , autocommit = False , prepare_threshold = 5 , row_factory = None , cursor_factory = None , context = None , ** kwargs ) #
-
Connect to a database server and return a new Connection instance.
- Return type :
-
Connection
[Any
]
This is an alias of the class method Connection.connect : see its documentation for details.
If you need an asynchronous connection use AsyncConnection.connect instead.
Exceptions
The standard DBAPI exceptions are exposed both by the !psycopg module and by the psycopg.errors module. The latter also exposes more specific exceptions, mapping to the database error states (see SQLSTATE exceptions ).
!Exception __ Warning __ Error __ InterfaceError __ DatabaseError __ DataError __ OperationalError __ IntegrityError __ InternalError __ ProgrammingError __ NotSupportedError
- psycopg. adapters #
-
The default adapters map establishing how Python and PostgreSQL types are converted into each other.
This map is used as a template when new connections are created, using psycopg.connect() . Its ~psycopg.adapt.AdaptersMap.types attribute is a ~psycopg.types.TypesRegistry containing information about every PostgreSQL builtin type, useful for adaptation customisation (see Data adaptation configuration ):
>>> psycopg.adapters.types["int4"]
- Type :
-
~psycopg.adapt.AdaptersMap
Connection classes #
The
Connection
and
AsyncConnection
classes are the main wrappers for a
PostgreSQL database session. You can imagine them similar to a
psql
session.
One of the differences compared to
psql
is that a
Connection
usually handles a transaction automatically: other sessions will not be able
to see the changes until you have committed them, more or less explicitly.
Take a look to
Transactions management
for the details.
The
Connection
class
#
-
class
psycopg.
Connection
(
pgconn
,
row_factory=
tuple_row> ) # -
Wrapper for a connection to the database.
This class implements a DBAPI-compliant interface . It is what you want to use if you write a "classic", blocking program (eventually using threads or Eventlet/gevent for concurrency). If your program uses
asyncio
you might want to useAsyncConnection
instead.Connections behave as context managers: on block exit, the current transaction will be committed (or rolled back, in case of exception) and the connection will be closed.
- classmethod connect ( conninfo = '' , * , autocommit = False , prepare_threshold = 5 , row_factory = None , cursor_factory = None , context = None , ** kwargs ) #
-
Connect to a database server and return a new Connection instance.
- Return type :
-
Connection
[Any
] - Parameters :
-
-
conninfo - The connection string (a
postgresql://
url or a list ofkey=value
pairs) to specify where and how to connect. -
kwargs - Further parameters specifying the connection string. They override the ones specified in !conninfo .
-
autocommit - If !True don’t start transactions automatically. See Transactions management for details.
-
row_factory - The row factory specifying what type of records to create fetching data (default: ~psycopg.rows.tuple_row() ). See Row factories for details.
-
cursor_factory - Initial value for the cursor_factory attribute of the connection (new in Psycopg 3.1).
-
prepare_threshold - Initial value for the prepare_threshold attribute of the connection (new in Psycopg 3.1).
-
More specialized use:
- Parameters :
-
context - A context to copy the initial adapters configuration from. It might be an ~psycopg.adapt.AdaptersMap with customized loaders and dumpers, used as a template to create several connections. See Data adaptation configuration for further details.
This method is also aliased as psycopg.connect() .
See also
-
the list of the accepted connection parameters
-
the environment variables affecting connection
Changed in version 3.1: added !prepare_threshold and !cursor_factory parameters.
- close ( ) #
-
Close the database connection.
Note
You can use:
with psycopg.connect() as conn: ...
to close the connection automatically when the block is exited. See Connection context .
- closed #
-
!True if the connection is closed.
- broken #
-
!True if the connection was interrupted.
A broken connection is always closed , but wasn’t closed in a clean way, such as using close() or a !with block.
- cursor ( * , binary : bool = False , row_factory : RowFactory None = None ) Cursor #
- cursor ( name : str , * , binary : bool = False , row_factory : RowFactory None = None , scrollable : bool None = None , withhold : bool = False ) ServerCursor
-
Return a new cursor to send commands and queries to the connection.
- Parameters :
-
-
name - If not specified create a client-side cursor, if specified create a server-side cursor. See Cursor types for details.
-
binary - If !True return binary values from the database. All the types returned by the query must have a binary loader. See Binary parameters and results for details.
-
row_factory - If specified override the row_factory set on the connection. See Row factories for details.
-
scrollable - Specify the ~ServerCursor.scrollable property of the server-side cursor created.
-
withhold - Specify the ~ServerCursor.withhold property of the server-side cursor created.
-
- Returns :
-
A cursor of the class specified by cursor_factory (or server_cursor_factory if !name is specified).
Note
You can use:
with conn.cursor() as cur: ...
to close the cursor automatically when the block is exited.
-
cursor_factory
:
Type
[Cursor
[TypeVar
(Row
, covariant=True)]] # -
The type, or factory function, returned by cursor() and execute() .
Default is psycopg.Cursor .
-
server_cursor_factory
:
Type
[ServerCursor
[TypeVar
(Row
, covariant=True)]] # -
The type, or factory function, returned by cursor() when a name is specified.
Default is psycopg.ServerCursor .
-
row_factory
:
RowFactory
[TypeVar
(Row
, covariant=True)] # -
The row factory defining the type of rows returned by ~Cursor.fetchone() and the other cursor fetch methods.
The default is ~psycopg.rows.tuple_row , which means that the fetch methods will return simple tuples.
See also
See Row factories for details about defining the objects returned by cursors.
- execute ( query , params = None , * , prepare = None , binary = False ) #
-
Execute a query and return a cursor to read its results.
- Return type :
- Parameters :
-
-
query ( !str , !bytes , sql.SQL , or sql.Composed ) - The query to execute.
-
params ( Sequence or Mapping ) - The parameters to pass to the query, if any.
-
prepare - Force ( !True ) or disallow ( !False ) preparation of the query. By default ( !None ) prepare automatically. See Prepared statements .
-
binary - If !True the cursor will return binary values from the database. All the types returned by the query must have a binary loader. See Binary parameters and results for details.
-
The method simply creates a Cursor instance, ~Cursor.execute() the query requested, and returns it.
See Passing parameters to SQL queries for all the details about executing queries.
- pipeline ( ) #
-
Switch the connection into pipeline mode.
The method is a context manager: you should call it using:
with conn.pipeline() as p: ...
At the end of the block, a synchronization point is established and the connection returns in normal mode.
You can call the method recursively from within a pipeline block. Innermost blocks will establish a synchronization point on exit, but pipeline mode will be kept until the outermost block exits.
See Pipeline mode support for details.
New in version 3.1.
Transaction management methods
For details see Transactions management .
- commit ( ) #
-
Commit any pending transaction to the database.
- rollback ( ) #
-
Roll back to the start of any pending transaction.
- transaction ( savepoint_name = None , force_rollback = False ) #
-
Start a context block with a new transaction or nested transaction.
- Parameters :
- Return type :
Note
The method must be called with a syntax such as:
with conn.transaction(): ... with conn.transaction() as tx: ...
The latter is useful if you need to interact with the Transaction object. See Transaction contexts for details.
Inside a transaction block it will not be possible to call commit() or rollback() .
- autocommit #
-
The autocommit state of the connection.
The property is writable for sync connections, read-only for async ones: you should call !await ~AsyncConnection.set_autocommit
( value )
instead.
The following three properties control the characteristics of new transactions. See Transaction characteristics for details.
- isolation_level #
-
The isolation level of the new transactions started on the connection.
!None means use the default set in the default_transaction_isolation configuration parameter of the server.
- read_only #
-
The read-only state of the new transactions started on the connection.
!None means use the default set in the default_transaction_read_only configuration parameter of the server.
- deferrable #
-
The deferrable state of the new transactions started on the connection.
!None means use the default set in the default_transaction_deferrable configuration parameter of the server.
Checking and configuring the connection state
- pgconn : psycopg.pq.PGconn #
-
The ~pq.PGconn libpq connection wrapper underlying the !Connection .
It can be used to send low level commands to PostgreSQL and access features not currently wrapped by Psycopg.
- info #
-
A ConnectionInfo attribute to inspect connection properties.
- prepare_threshold #
-
Number of times a query is executed before it is prepared.
-
If it is set to 0, every query is prepared the first time it is executed.
-
If it is set to !None , prepared statements are disabled on the connection.
Default value: 5
See Prepared statements for details.
-
- prepared_max #
-
Maximum number of prepared statements on the connection.
Default value: 100
If more queries need to be prepared, old ones are deallocated .
Methods you can use to do something cool
- cancel ( ) #
-
Cancel the current operation on the connection.
- notifies ( ) #
-
Yield Notify objects as soon as they are received from the database.
Notifies are received after using
LISTEN
in a connection, when any sessions in the database generates aNOTIFY
on one of the listened channels.
- add_notify_handler ( callback ) #
-
Register a callable to be invoked whenever a notification is received.
- Parameters :
-
callback ( Callable [ [ Notify ] , None ] ) - the callback to call upon notification received.
See Asynchronous notifications for details.
- remove_notify_handler ( callback ) #
-
Unregister a notification callable previously registered.
- Parameters :
-
callback ( Callable [ [ Notify ] , None ] ) - the callback to remove.
- add_notice_handler ( callback ) #
-
Register a callable to be invoked when a notice message is received.
- Parameters :
-
callback ( Callable [ [ Diagnostic ] , None ] ) - the callback to call upon message received.
See Server messages for details.
- remove_notice_handler ( callback ) #
-
Unregister a notice message callable previously registered.
- Parameters :
-
callback ( Callable [ [ Diagnostic ] , None ] ) - the callback to remove.
- fileno ( ) #
-
Return the file descriptor of the connection.
This function allows to use the connection as file-like object in functions waiting for readiness, such as the ones defined in the selectors module.
- Return type :
Two-Phase Commit support methods
New in version 3.1.
See also
Two-Phase Commit protocol support for an introductory explanation of these methods.
- xid ( format_id , gtrid , bqual ) #
-
Returns a Xid to pass to the !tpc_*() methods of this connection.
The argument types and constraints are explained in Two-Phase Commit protocol support .
The values passed to the method will be available on the returned object as the members ~Xid.format_id , ~Xid.gtrid , ~Xid.bqual .
- Return type :
- tpc_begin ( xid ) #
-
Begin a TPC transaction with the given transaction ID !xid .
This method should be called outside of a transaction (i.e. nothing may have executed since the last commit() or rollback() and ~ConnectionInfo.transaction_status is ~pq.TransactionStatus.IDLE ).
Furthermore, it is an error to call !commit() or !rollback() within the TPC transaction: in this case a ProgrammingError is raised.
The !xid may be either an object returned by the xid() method or a plain string: the latter allows to create a transaction using the provided string as PostgreSQL transaction id. See also tpc_recover() .
- tpc_prepare ( ) #
-
Perform the first phase of a transaction started with tpc_begin() .
A ProgrammingError is raised if this method is used outside of a TPC transaction.
After calling !tpc_prepare() , no statements can be executed until tpc_commit() or tpc_rollback() will be called.
See also
The
PREPARE TRANSACTION
PostgreSQL command.
- tpc_commit ( xid = None ) #
-
Commit a prepared two-phase transaction.
When called with no arguments, !tpc_commit() commits a TPC transaction previously prepared with tpc_prepare() .
If !tpc_commit() is called prior to !tpc_prepare() , a single phase commit is performed. A transaction manager may choose to do this if only a single resource is participating in the global transaction.
When called with a transaction ID !xid , the database commits the given transaction. If an invalid transaction ID is provided, a ProgrammingError will be raised. This form should be called outside of a transaction, and is intended for use in recovery.
On return, the TPC transaction is ended.
See also
The
COMMIT PREPARED
PostgreSQL command.
- tpc_rollback ( xid = None ) #
-
Roll back a prepared two-phase transaction.
When called with no arguments, !tpc_rollback() rolls back a TPC transaction. It may be called before or after tpc_prepare() .
When called with a transaction ID !xid , it rolls back the given transaction. If an invalid transaction ID is provided, a ProgrammingError is raised. This form should be called outside of a transaction, and is intended for use in recovery.
On return, the TPC transaction is ended.
See also
The
ROLLBACK PREPARED
PostgreSQL command.
- tpc_recover ( ) #
-
Returns a list of Xid representing pending transactions, suitable for use with tpc_commit() or tpc_rollback() .
If a transaction was not initiated by Psycopg, the returned Xids will have attributes ~Xid.format_id and ~Xid.bqual set to !None and the ~Xid.gtrid set to the PostgreSQL transaction ID: such Xids are still usable for recovery. Psycopg uses the same algorithm of the PostgreSQL JDBC driver to encode a XA triple in a string, so transactions initiated by a program using such driver should be unpacked correctly.
Xids returned by !tpc_recover() also have extra attributes ~Xid.prepared , ~Xid.owner , ~Xid.database populated with the values read from the server.
See also
the pg_prepared_xacts system view.
The !AsyncConnection class #
-
class
psycopg.
AsyncConnection
(
pgconn
,
row_factory=
tuple_row> ) # -
Asynchronous wrapper for a connection to the database.
This class implements a DBAPI-inspired interface, with all the blocking methods implemented as coroutines. Unless specified otherwise, non-blocking methods are shared with the Connection class.
The following methods have the same behaviour of the matching !Connection methods, but should be called using the await keyword.
- async classmethod connect ( conninfo = '' , * , autocommit = False , prepare_threshold = 5 , context = None , row_factory = None , cursor_factory = None , ** kwargs ) #
-
- Return type :
-
AsyncConnection
[Any
]
Changed in version 3.1: Automatically resolve domain names asynchronously. In previous versions, name resolution blocks, unless the !hostaddr parameter is specified, or the ~psycopg._dns.resolve_hostaddr_async() function is used.
- async close ( ) #
-
Note
You can use
async with
to close the connection automatically when the block is exited, but be careful about the async quirkness: see with async connections for details.
- cursor ( * , binary : bool = False , row_factory : RowFactory None = None ) AsyncCursor #
- cursor ( name : str , * , binary : bool = False , row_factory : RowFactory None = None , scrollable : bool None = None , withhold : bool = False ) AsyncServerCursor
-
Note
You can use:
async with conn.cursor() as cur: ...
to close the cursor automatically when the block is exited.
-
cursor_factory
:
Type
[AsyncCursor
[TypeVar
(Row
, covariant=True)]] # -
Default is psycopg.AsyncCursor .
-
server_cursor_factory
:
Type
[AsyncServerCursor
[TypeVar
(Row
, covariant=True)]] # -
Default is psycopg.AsyncServerCursor .
-
row_factory
:
AsyncRowFactory
[TypeVar
(Row
, covariant=True)] #
- async execute ( query , params = None , * , prepare = None , binary = False ) #
-
- Return type :
-
AsyncCursor
[TypeVar
(Row
, covariant=True)]
- pipeline ( ) #
-
Context manager to switch the connection into pipeline mode.
- Return type :
Note
It must be called as:
async with conn.pipeline() as p: ...
- async commit ( ) #
- async rollback ( ) #
- transaction ( savepoint_name = None , force_rollback = False ) #
-
Start a context block with a new transaction or nested transaction.
- Return type :
Note
It must be called as:
async with conn.transaction() as tx: ...
- async notifies ( ) #
-
- Return type :
-
AsyncGenerator
[Notify
,None
]
- async set_autocommit ( value ) #
-
Async version of the ~Connection.autocommit setter.
- async set_isolation_level ( value ) #
-
Async version of the ~Connection.isolation_level setter.
- async set_read_only ( value ) #
-
Async version of the ~Connection.read_only setter.
- async set_deferrable ( value ) #
-
Async version of the ~Connection.deferrable setter.
- async tpc_prepare ( ) #
- async tpc_commit ( xid = None ) #
- async tpc_rollback ( xid = None ) #
Cursor classes #
The
Cursor
and
AsyncCursor
classes are the main objects to send commands
to a PostgreSQL database session. They are normally created by the
connection’s
cursor()
method.
Using the
name
parameter on
cursor()
will create a
ServerCursor
or
AsyncServerCursor
, which can be used to retrieve partial results from a
database.
A
Connection
can create several cursors, but only one at time can perform
operations, so they are not the best way to achieve parallelism (you may want
to operate with several connections instead). All the cursors on the same
connection have a view of the same session, so they can see each other’s
uncommitted data.
The
Cursor
class
#
- class psycopg. Cursor ( connection , * , row_factory = None ) #
-
This class implements a DBAPI-compliant interface . It is what the classic
Connection.cursor()
method returns.AsyncConnection.cursor()
will create insteadAsyncCursor
objects, which have the same set of method but expose anasyncio
interface and requireasync
andawait
keywords to operate.Cursors behave as context managers: on block exit they are closed and further operation will not be possible. Closing a cursor will not terminate a transaction or a session though.
- connection : Connection #
-
The connection this cursor is using.
- close ( ) #
-
Close the current cursor and free associated resources.
Note
You can use:
with conn.cursor() as cur: ...
to close the cursor automatically when the block is exited. See Main objects in Psycopg 3 .
Methods to send commands
- execute ( query , params = None , * , prepare = None , binary = None ) #
-
Execute a query or command to the database.
- Return type :
-
TypeVar
(_Self
, bound= Cursor[Any]) - Parameters :
-
-
query ( !str , !bytes , sql.SQL , or sql.Composed ) - The query to execute.
-
params ( Sequence or Mapping ) - The parameters to pass to the query, if any.
-
prepare - Force ( !True ) or disallow ( !False ) preparation of the query. By default ( !None ) prepare automatically. See Prepared statements .
-
binary - Specify whether the server should return data in binary format ( !True ) or in text format ( !False ). By default ( !None ) return data as requested by the cursor’s ~Cursor.format .
-
Return the cursor itself, so that it will be possible to chain a fetch operation after the call.
See Passing parameters to SQL queries for all the details about executing queries.
Changed in version 3.1: The query argument must be a ~typing.StringLiteral . If you need to compose a query dynamically, please use sql.SQL and related objects.
See PEP 675 for details.
- executemany ( query , params_seq , * , returning = False ) #
-
Execute the same command with a sequence of input data.
- Parameters :
-
-
query ( !str , !bytes , sql.SQL , or sql.Composed ) - The query to execute
-
params_seq ( Sequence of Sequences or Mappings ) - The parameters to pass to the query
-
returning ( !bool ) - If !True , fetch the results of the queries executed
-
This is more efficient than performing separate queries, but in case of several
INSERT
(and with some SQL creativity for massiveUPDATE
too) you may consider using copy() .If the queries return data you want to read (e.g. when executing an
INSERT ... RETURNING
or aSELECT
with a side-effect), you can specify !returning=True ; the results will be available in the cursor’s state and can be read using fetchone() and similar methods. Each input parameter will produce a separate result set: use nextset() to read the results of the queries after the first one.The value of rowcount is set to the cumulated number of rows affected by queries; except when using !returning=True , in which case it is set to the number of rows in the current result set (i.e. the first one, until nextset() gets called).
See Passing parameters to SQL queries for all the details about executing queries.
Changed in version 3.1:
-
Added !returning parameter to receive query results.
-
Performance optimised by making use of the pipeline mode, when using libpq 14 or newer.
- copy ( statement , params = None , * , writer = None ) #
-
Initiate a
COPY
operation and return an object to manage it.- Return type :
- Parameters :
-
-
statement ( !str , !bytes , sql.SQL , or sql.Composed ) - The copy operation to execute
-
params ( Sequence or Mapping ) - The parameters to pass to the statement, if any.
-
Note
The method must be called with:
with cursor.copy() as copy: ...
See Using COPY TO and COPY FROM for information about
COPY
.Changed in version 3.1: Added parameters support.
- stream ( query , params = None , * , binary = None ) #
-
Iterate row-by-row on a result from the database.
This command is similar to execute + iter; however it supports endless data streams. The feature is not available in PostgreSQL, but some implementations exist: Materialize TAIL and CockroachDB CHANGEFEED for instance.
The feature, and the API supporting it, are still experimental. Beware… 👀
The parameters are the same of execute() .
Warning
Failing to consume the iterator entirely will result in a connection left in ~psycopg.ConnectionInfo.transaction_status ~pq.TransactionStatus.ACTIVE state: this connection will refuse to receive further commands (with a message such as another command is already in progress ).
If there is a chance that the generator is not consumed entirely, in order to restore the connection to a working state you can call ~generator.close on the generator object returned by !stream() . The contextlib.closing function might be particularly useful to make sure that !close() is called:
with closing(cur.stream("select generate_series(1, 10000)")) as gen: for rec in gen: something(rec) # might fail
Without calling !close() , in case of error, the connection will be !ACTIVE and unusable. If !close() is called, the connection might be !INTRANS or !INERROR , depending on whether the server managed to send the entire resultset to the client. An autocommit connection will be !IDLE instead.
- format #
-
The format of the data returned by the queries. It can be selected initially e.g. specifying Connection.cursor !(binary=True) and changed during the cursor’s lifetime. It is also possible to override the value for single queries, e.g. specifying execute !(binary=True) .
- Type :
-
pq.Format
- Default :
-
~pq.Format.TEXT
See also
Methods to retrieve results
Fetch methods are only available if the last operation produced results, e.g. a
SELECT
or a command withRETURNING
. They will raise an exception if used with operations that don’t return result, such as anINSERT
with noRETURNING
or anALTER TABLE
.Note
Cursors are iterable objects, so just using the:
for record in cursor: ...
syntax will iterate on the records in the current recordset.
- row_factory #
-
Writable attribute to control how result rows are formed.
The property affects the objects returned by the fetchone() , fetchmany() , fetchall() methods. The default ( ~psycopg.rows.tuple_row ) returns a tuple for each record fetched.
See Row factories for details.
- fetchone ( ) #
-
Return the next record from the current recordset.
Return !None the recordset is finished.
- Return type :
-
Optional[Row], with Row defined by row_factory
- fetchmany ( size = 0 ) #
-
Return the next !size records from the current recordset.
!size default to !self.arraysize if not specified.
- Return type :
-
Sequence[Row], with Row defined by row_factory
- fetchall ( ) #
-
Return all the remaining records from the current recordset.
- Return type :
-
Sequence[Row], with Row defined by row_factory
- nextset ( ) #
-
Move to the result set of the next query executed through executemany() or to the next result set if execute() returned more than one.
Return !True if a new result is available, which will be the one methods !fetch*() will operate on.
- scroll ( value , mode = 'relative' ) #
-
Move the cursor in the result set to a new position according to mode.
If !mode is
'relative'
(default), !value is taken as offset to the current position in the result set; if set to'absolute'
, !value states an absolute target position.Raise !IndexError in case a scroll operation would leave the result set. In this case the position will not change.
- pgresult : psycopg.pq.PGresult None #
-
The result returned by the last query and currently exposed by the cursor, if available, else !None .
It can be used to obtain low level info about the last query result and to access to features not currently wrapped by Psycopg.
Information about the data
- description #
-
A list of Column objects describing the current resultset.
!None if the current resultset didn’t return tuples.
- statusmessage #
-
The command status tag from the last SQL command executed.
!None if the cursor doesn’t have a result available.
This is the status tag you typically see in psql after a successful command, such as
CREATE TABLE
orUPDATE 42
.
- rowcount #
-
Number of records affected by the precedent operation.
From executemany() , unless called with !returning=True , this is the cumulated number of rows affected by executed commands.
- rownumber #
-
Index of the next row to fetch in the current result.
!None if there is no result to fetch.
- _query #
-
An helper object used to convert queries and parameters before sending them to PostgreSQL.
Note
This attribute is exposed because it might be helpful to debug problems when the communication between Python and PostgreSQL doesn’t work as expected. For this reason, the attribute is available when a query fails too.
Warning
You shouldn’t consider it part of the public interface of the object: it might change without warnings.
Except this warning, I guess.
If you would like to build reliable features using this object, please get in touch so we can try and design an useful interface for it.
Among the properties currently exposed by this object:
-
!query ( !bytes ): the query effectively sent to PostgreSQL. It will have Python placeholders (
%s
-style) replaced with PostgreSQL ones ($1
,$2
-style). -
!params (sequence of !bytes ): the parameters passed to PostgreSQL, adapted to the database format.
-
!types (sequence of !int ): the OID of the parameters passed to PostgreSQL.
-
!formats (sequence of pq.Format ): whether the parameter format is text or binary.
-
The !ClientCursor class #
See also
See Client-side-binding cursors for details.
- class psycopg. ClientCursor ( connection , * , row_factory = None ) #
-
This Cursor subclass has exactly the same interface of its parent class, but, instead of sending query and parameters separately to the server, it merges them on the client and sends them as a non-parametric query on the server. This allows, for instance, to execute parametrized data definition statements and other problematic queries .
New in version 3.1.
- mogrify ( query , params = None ) #
-
Return the query and parameters merged.
Parameters are adapted and merged to the query the same way that !execute() would do.
- Return type :
- Parameters :
-
-
query ( !str , !bytes , sql.SQL , or sql.Composed ) - The query to execute.
-
params ( Sequence or Mapping ) - The parameters to pass to the query, if any.
-
The !ServerCursor class #
See also
See Server-side cursors for details.
- class psycopg. ServerCursor ( connection , name , * , row_factory = None , scrollable = None , withhold = False ) #
-
This class also implements a DBAPI-compliant interface . It is created by Connection.cursor() specifying the !name parameter. Using this object results in the creation of an equivalent PostgreSQL cursor in the server. DBAPI-extension methods (such as ~Cursor.copy() or ~Cursor.stream() ) are not implemented on this object: use a normal Cursor instead.
Most attribute and methods behave exactly like in Cursor , here are documented the differences:
- name #
-
The name of the cursor.
- scrollable #
-
Whether the cursor is scrollable or not.
If !None leave the choice to the server. Use !True if you want to use scroll() on the cursor.
See also
The PostgreSQL DECLARE statement documentation for the description of
[NO] SCROLL
.
- withhold #
-
If the cursor can be used after the creating transaction has committed.
See also
The PostgreSQL DECLARE statement documentation for the description of
{WITHWITHOUT} HOLD
.
- close ( ) #
-
Close the current cursor and free associated resources.
Warning
Closing a server-side cursor is more important than closing a client-side one because it also releases the resources on the server, which otherwise might remain allocated until the end of the session (memory, locks). Using the pattern:
with conn.cursor(): ...
is especially useful so that the cursor is closed at the end of the block.
- execute ( query , params = None , * , binary = None , ** kwargs ) #
-
Open a cursor to execute a query to the database.
- Return type :
-
TypeVar
(_Self
, bound= ServerCursor[Any]) - Parameters :
-
-
query ( !str , !bytes , sql.SQL , or sql.Composed ) - The query to execute.
-
params ( Sequence or Mapping ) - The parameters to pass to the query, if any.
-
binary - Specify whether the server should return data in binary format ( !True ) or in text format ( !False ). By default ( !None ) return data as requested by the cursor’s ~Cursor.format .
-
Create a server cursor with given !name and the !query in argument.
If using
DECLARE
is not appropriate (for instance because the cursor is returned by calling a stored procedure) you can avoid to use !execute() , crete the cursor in other ways, and use directly the !fetch*() methods instead. See "Stealing" an existing cursor for an example.Using !execute() more than once will close the previous cursor and open a new one with the same name.
- executemany ( query , params_seq , * , returning = True ) #
-
Method not implemented for server-side cursors.
- fetchone ( ) #
-
Return the next record from the current recordset.
Return !None the recordset is finished.
- Return type :
-
Optional[Row], with Row defined by row_factory
- fetchmany ( size = 0 ) #
-
Return the next !size records from the current recordset.
!size default to !self.arraysize if not specified.
- Return type :
-
Sequence[Row], with Row defined by row_factory
- fetchall ( ) #
-
Return all the remaining records from the current recordset.
- Return type :
-
Sequence[Row], with Row defined by row_factory
These methods use the FETCH SQL statement to retrieve some of the records from the cursor’s current position.
Note
You can also iterate on the cursor to read its result one at time with:
for record in cur: ...
In this case, the records are not fetched one at time from the server but they are retrieved in batches of itersize to reduce the number of server roundtrips.
- itersize : int #
-
Number of records to fetch at time when iterating on the cursor. The default is 100.
- scroll ( value , mode = 'relative' ) #
-
Move the cursor in the result set to a new position according to mode.
If !mode is
'relative'
(default), !value is taken as offset to the current position in the result set; if set to'absolute'
, !value states an absolute target position.Raise !IndexError in case a scroll operation would leave the result set. In this case the position will not change.
This method uses the MOVE SQL statement to move the current position in the server-side cursor, which will affect following !fetch*() operations. If you need to scroll backwards you should probably call ~Connection.cursor() using scrollable=True .
Note that PostgreSQL doesn’t provide a reliable way to report when a cursor moves out of bound, so the method might not raise !IndexError when it happens, but it might rather stop at the cursor boundary.
The !AsyncCursor class #
- class psycopg. AsyncCursor ( connection , * , row_factory = None ) #
-
This class implements a DBAPI-inspired interface, with all the blocking methods implemented as coroutines. Unless specified otherwise, non-blocking methods are shared with the Cursor class.
The following methods have the same behaviour of the matching !Cursor methods, but should be called using the await keyword.
- connection : AsyncConnection #
- async close ( ) #
-
Note
You can use:
async with conn.cursor(): ...
to close the cursor automatically when the block is exited.
- async execute ( query , params = None , * , prepare = None , binary = None ) #
-
- Return type :
-
TypeVar
(_Self
, bound= AsyncCursor[Any])
- async executemany ( query , params_seq , * , returning = False ) #
- copy ( statement , params = None , * , writer = None ) #
-
- Return type :
Note
The method must be called with:
async with cursor.copy() as copy: ...
- async stream ( query , params = None , * , binary = None ) #
-
- Return type :
-
AsyncIterator
[TypeVar
(Row
, covariant=True)]
Note
The method must be called with:
async for record in cursor.stream(query): ...
- async scroll ( value , mode = 'relative' ) #
Note
You can also use:
async for record in cursor: ...
to iterate on the async cursor results.
The !AsyncClientCursor class #
- class psycopg. AsyncClientCursor ( connection , * , row_factory = None ) #
-
This class is the !async equivalent of the ClientCursor . The difference are the same shown in AsyncCursor .
New in version 3.1.
The !AsyncServerCursor class #
- class psycopg. AsyncServerCursor ( connection , name , * , row_factory = None , scrollable = None , withhold = False ) #
-
This class implements a DBAPI-inspired interface as the AsyncCursor does, but wraps a server-side cursor like the ServerCursor class. It is created by AsyncConnection.cursor() specifying the !name parameter.
The following are the methods exposing a different (async) interface from the ServerCursor counterpart, but sharing the same semantics.
- async close ( ) #
-
Note
You can close the cursor automatically using:
async with conn.cursor("name") as cursor: ...
- async execute ( query , params = None , * , binary = None , ** kwargs ) #
-
- Return type :
-
TypeVar
(_Self
, bound= AsyncServerCursor[Any])
- async executemany ( query , params_seq , * , returning = True ) #
- async fetchall ( ) #
-
Note
You can also iterate on the cursor using:
async for record in cur: ...
- async scroll ( value , mode = 'relative' ) #