from data import insight :
    About     Archive     Feed

From Python to Postgres in 2023

Back in 2019, Postgres developer/blogger Haki Benita wrote a long post about the fastest way to load data into Postgresql using python and psycopg2. With three years of improvements to psycopg2, and a newer library psycopg (“psycopg3” is installed/imported as psycopg with no number), I decided to revisit some of his tests and see if the fastest data load method has changed. I also added a new DuckDB-based test case to see how much overhead one of my new favorite toys adds to data cleaning and loading.

Defining the problem

The ground rules

I’m reimplementing the code from the 2019 post in both psycopg2 and psyopg. There are other choices out there these days, but if you’ve been using psycopg2, psycopg is your likely upgrade path since it will require fewer changes than coding to a brand new API.

When I mention the performance of an option in 2019, I copied that from the original blog post: I did not try to install four year old versions of python and psycopg. So, there are some apples and oranges comparisons here. My laptop is an older (2014) MacBook Pro, so I’m not throwing 2023 top-of-the-line hardware vs 2019 hardware. However, I’m using Postgres 15, so I’ve upgraded my database software as well as my python version and libraries.

The data

I’m only going to touch on the dataset briefly, for more details please read refer to the links to the original post.

There is a list of 32500 beers, generated by by querying an API over the network for 325 beers, then making a total of 100 copies of the list. This list is then stored in memory, which takes network latency out of the performance comparisons.

The data loaders can make a copy of this list (increasing their memory use) but if they iterate one item at a time, the memory used by the full list does not “count”. This is meant to simulate streaming the data vs dowloading and processing it in bulk.

The destination table will only take 17 keys from the input data and two of those keys (first_brewed and volume) need some minor data cleaning.

The destination table

I’m using the same destination table definition as in the original blog post, but the “cursor” in the create_staging_table function could be from psycopg2 or psycopg.

EitherConnection = psycopg2.extensions.connection | psycopg.Connection

def create_staging_table(cursor: EitherConnection) -> None:
    cursor.execute("""
        DROP TABLE IF EXISTS staging_beers;
        CREATE UNLOGGED TABLE staging_beers (
            id                  INTEGER,
            name                TEXT,
            tagline             TEXT,
            first_brewed        DATE,
            description         TEXT,
            image_url           TEXT,
            abv                 DECIMAL,
            ibu                 DECIMAL,
            target_fg           DECIMAL,
            target_og           DECIMAL,
            ebc                 DECIMAL,
            srm                 DECIMAL,
            ph                  DECIMAL,
            attenuation_level   DECIMAL,
            brewers_tips        TEXT,
            contributed_by      TEXT,
            volume              INTEGER
        );
    """)

The profiler

I’m also stealing the time and memory profiler from the original post. The profiler wrapper runs each implementation twice - once to profile time and
and again to profile memory use.

One thing I did notice during my tests is that the memory usage was much more variable between different runs of the same test case, especially if I didn’t restart my notebook between trials.

The baseline

The baseline test case is to insert rows one by one. In 2019, this took two minutes to accomplish with psycopg2. Using the 2023 version of psycopg2, things got much better, and the new psycopg is better still.

@profile
def insert_one_by_one(
    connection: EitherConnection, beers: list[dict[str, Any]]
) -> None:
    """The insert-one-by-one implementation doesn't change
    between psycopg2 and psycopg"""
    
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        for beer in beers:
            cursor.execute(
                """
                INSERT INTO staging_beers VALUES (
                    %(id)s,
                    %(name)s,
                    %(tagline)s,
                    %(first_brewed)s,
                    %(description)s,
                    %(image_url)s,
                    %(abv)s,
                    %(ibu)s,
                    %(target_fg)s,
                    %(target_og)s,
                    %(ebc)s,
                    %(srm)s,
                    %(ph)s,
                    %(attenuation_level)s,
                    %(brewers_tips)s,
                    %(contributed_by)s,
                    %(volume)s
                );""",
                {
                    **beer,
                    "first_brewed": parse_first_brewed(beer["first_brewed"]),
                    "volume": beer["volume"]["value"],
                },
            )

psycopg2 - 2019

insert_one_by_one()
Time   128.8
Memory 0.08203125

psycopg2 - 2023

insert_one_by_one()
Time   4.568
Memory 0.125

psycopg - 2023

insert_one_by_one()
Time   3.683
Memory 0.0078125

Executemany

The next test case in the original blog post used cursor.executemany to send many rows in one function call. One flavor marshalls the entire list, the other uses a generator to send one row at a time to reduce memory footprint. The executemany and executemany_with_iterator functions can be run the same code for psycopg2 and psycopg. Psycopg2-2023 beats psycopg2-2019 again, and the psycopg implementation shows further improvement, this time by a factor of almost 4x faster!

@profile
def insert_executemany_iterator(
    connection: EitherConnection, beers: list[dict[str, Any]]
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        cursor.executemany("""
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
            """,
            # I didn't copy both implementations here, because they are 
            # so similar:
            # insert_executemany uses square-brackets [] to pre-compute the entire list
            # insert_executemany_iterator uses parens () to get a one-row-at-a-time generator
            (
                {
                    **beer,
                    "first_brewed": parse_first_brewed(beer["first_brewed"]),
                    "volume": beer["volume"]["value"],
                }
                for beer in beers
            ),
        )

psycopg2 - 2019

insert_executemany()
Time   124.7
Memory 2.765625

insert_executemany_iterator()
Time   129.3
Memory 0.0

psycopg2 - 2023

insert_executemany()
Time   4.279
Memory 16.70703125

insert_executemany_iterator()
Time   4.399
Memory 0.0

psycopg - 2023

insert_executemany()
Time   1.24
Memory 3.9453125

insert_executemany_iterator()
Time   1.231
Memory 0.0

Copy implementations

The original blog post also experimented with the psycopg2’s execute_values and execute_batch functions, including trying with different batch sizes. These approaches do not have a direct analog in psycopg, and there doesn’t seem to be a need for them given the massive improvement in executemany. However, if you are porting code from psycopg2, be aware that these functions no longer exist.

I’m going to skip ahead to the fastest implementations, the ones based on the Postgresql COPY function.

Copying with psycopg2

The fastest approach in 2019 was to use psycopg2’s copy functions. There are two implementations, one that uses a StringIO object to generate a csv file in memory, which is then copied to the table as a blob. The other implementation uses a custom subclass of io.TextIOBase to avoid holding the entire set of rows in memory, improving speed and memory use at the expense of additional code complexity.

@profile
def copy_stringio(connection: psycopg2.extensions.connection, beers: list[dict[str, Any]]) -> None:
    """You don't really need to create a stringio file object to use modern psycopg's copy
    but you do need to carefully set all the arguments to copy if you're going
    to string-ify everything yourself.
    """
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        csv_file_like_object = io.StringIO()
        for beer in beers:
            csv_file_like_object.write(
                 "|".join(
                        map(
                            clean_csv_value,
                            (
                                beer["id"],
                                beer["name"],
                                beer["tagline"],
                                parse_first_brewed(beer["first_brewed"]).isoformat(),
                                beer["description"],
                                beer["image_url"],
                                beer["abv"],
                                beer["ibu"],
                                beer["target_fg"],
                                beer["target_og"],
                                beer["ebc"],
                                beer["srm"],
                                beer["ph"],
                                beer["attenuation_level"],
                                beer["contributed_by"],
                                beer["brewers_tips"],
                                beer["volume"]["value"],
                            ), # end tuple
                        )  # end map
                    )  # end join |
                + "\n"
            )  # end write

        csv_file_like_object.seek(0)
        cursor.copy_from(
                csv_file_like_object, 'staging_beers', sep='|'
            )

class StringIteratorIO(io.TextIOBase):
    def __init__(self: Self, iter8: Iterator[str]) -> None:
        self._iter = iter8
        self._buff = ''

    def readable(self: Self) -> bool:
        return True

    def _read1(self: Self, n: int | None = None) -> str:
        while not self._buff:
            try:
                self._buff = next(self._iter)
            except StopIteration:
                break
        ret = self._buff[:n]
        self._buff = self._buff[len(ret):]
        return ret

    def read(self: Self, n: int | None = None) -> str:
        line = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                line.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                line.append(m)
        return ''.join(line)


@profile
def copy_string_iterator(
    connection: psycopg2.extensions.connection, beers: list[dict[str, Any]]
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\n'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|')

Copying with psycopg

I could not use the same code for psycopg: Psycopg2’s cursor.copy_from method is replaced by a new function cursor.copy which returns an object to which you write the data to be copied. This means you don’t really need to create a separate file-like-object in order to accomplish the copy unless you need one for some other reason.

In order to force the rewritten copy_stringio method to send the entire file at once (to compare apples to apples), I did go ahead and creat a StringIO object. However, for copy_string_iterator I just wrote directly to psycopg’s copy object. This simplifies the code from the 2019 implementation and still saves time and memory.

@profile
def copy_stringio(connection: psycopg.Connection, beers: list[dict[str, Any]]) -> None:
    """You don't really need to create a stringio file object to use modern psycopg's copy
    but you do need to carefully set all the arguments to copy if you're going
    to string-ify everything yourself.
    """
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        csv_file_like_object = io.StringIO()
        for beer in beers:
            csv_file_like_object.write(
                 "|".join(
                        map(
                            clean_csv_value,
                            (
                                beer["id"],
                                beer["name"],
                                beer["tagline"],
                                parse_first_brewed(beer["first_brewed"]).isoformat(),
                                beer["description"],
                                beer["image_url"],
                                beer["abv"],
                                beer["ibu"],
                                beer["target_fg"],
                                beer["target_og"],
                                beer["ebc"],
                                beer["srm"],
                                beer["ph"],
                                beer["attenuation_level"],
                                beer["contributed_by"],
                                beer["brewers_tips"],
                                beer["volume"]["value"],
                            ), # end tuple
                        )  # end map
                    )  # end join |
                    + "\n"
            )  # end write

        csv_file_like_object.seek(0)
        with cursor.copy(
            """COPY
            staging_beers
            FROM STDIN (
                FORMAT CSV,
                HEADER FALSE,
                DELIMITER '|',
                NULL '\\N'
            )""",
        ) as copy:
            copy.write(csv_file_like_object.getvalue())


@profile
def copy_string_iterator(
    connection: psycopg.Connection, beers: list[dict[str, Any]]
) -> None:
    """Rather than making the whole csv string at once, the for-loop
    yields it up one row at a time.
    """

    with connection.cursor() as cursor:
        create_staging_table(cursor)

        with cursor.copy(
            """COPY
            staging_beers FROM STDIN (
                FORMAT CSV,
                HEADER FALSE,
                DELIMITER '|',
                NULL '\\N'
            )""",
        ) as copy:
            for beer in beers:
                copy.write(
                    "|".join(
                        map(
                            clean_csv_value,
                            (
                                beer["id"],
                                beer["name"],
                                beer["tagline"],
                                parse_first_brewed(beer["first_brewed"]).isoformat(),
                                beer["description"],
                                beer["image_url"],
                                beer["abv"],
                                beer["ibu"],
                                beer["target_fg"],
                                beer["target_og"],
                                beer["ebc"],
                                beer["srm"],
                                beer["ph"],
                                beer["attenuation_level"],
                                beer["contributed_by"],
                                beer["brewers_tips"],
                                beer["volume"]["value"],
                            ),
                        )  # end map
                    )  # end join |
                )  # end write
                copy.write("\n")

But, even the string_iterator method is not using the new copy object to it’s fullest potential. The clean_csv_value function that handles NULLs and string-ify-ing column values is no longer necessary in psycopg: The copy object can take a tuple of all the data for a row and handle any None’s for you. Furthermore, if you tell psycopg the data types of the destination columns, the library will use binary transfers under the covers where possible, saving the overhead of converting column values to strings.

I call this new flavor, copy_tuple_operator:

@profile
def copy_tuple_iterator(
    connection: psycopg.Connection,
    beers: list[dict[str, Any]],
) -> None:
    """Neither of the above methods is actually the best way to use modern psycopg:
    Let psycopg handle any nulls and deciding whether to send the value as a string
    or as a binary
    """
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        with cursor.copy(
            """COPY staging_beers(
                  id,                  -- 1
                  name,                -- 2
                  tagline,             -- 3
                  first_brewed,        -- 4
                  description,         -- 5
                  image_url,           -- 6
                  abv,                 -- 7
                  ibu,                 -- 8
                  target_fg,           -- 9
                  target_og,           -- 10
                  ebc,                 -- 11
                  srm,                 -- 12
                  ph,                  -- 13
                  attenuation_level,   -- 14
                  contributed_by,      -- 15
                  brewers_tips,        -- 16
                  volume               -- 17
            ) FROM STDIN""",
        ) as copy:
            copy.set_types(
                (
                    "integer",  # 1
                    "text",  # 2
                    "text",  # 3
                    "date",  # 4
                    "text",  # 5
                    "text",  # 6
                    "numeric",  # 7
                    "numeric",  # 8
                    "numeric",  # 9
                    "numeric",  # 10
                    "numeric",  # 11
                    "numeric",  # 12
                    "numeric",  # 13
                    "numeric",  # 14
                    "text",  # 15
                    "text",  # 16
                    "integer",  # 17
                )
            )
            for beer in beers:
                copy.write_row(
                    (
                        beer["id"],  # 1
                        beer["name"],  # 2
                        beer["tagline"],  # 3
                        parse_first_brewed(beer["first_brewed"]),  # 4
                        beer["description"],  # 5
                        beer["image_url"],  # 6
                        beer["abv"],  # 7
                        beer["ibu"],  # 8
                        beer["target_fg"],  # 9
                        beer["target_og"],  # 10
                        beer["ebc"],  # 11
                        beer["srm"],  # 12
                        beer["ph"],  # 13
                        beer["attenuation_level"],  # 14
                        beer["contributed_by"],  # 15
                        beer["brewers_tips"],  # 16
                        beer["volume"]["value"],  # 17
                    ),
                )

When doing things the string-munging way, psycopg doesn’t offer a big gain over psycopg2. However, there is a definite performance improvement when using the “tuple” method. Plus, the tuple method leaves the string-hacking stuff to the library, which means my application code is simpler and easier to maintain.

Psycopg2 - 2019

copy_stringio()
Time   0.6274
Memory 99.109375

copy_string_iterator(size=1024)
Time   0.4596
Memory 0.0

Psycopg2 - 2023

copy_stringio()
Time   0.4525
Memory 9.765625

copy_string_iterator()
Time   0.3442
Memory 0.0

Psycopg - 2023

copy_stringio()
Time   0.49
Memory 16.1

copy_string_iterator()
Time   0.3413
Memory 0.01

copy_tuple_iterator()
Time   0.2397
Memory 0.07

A new option for data staging in 2023 - DuckDB

But what if we don’t want to stage inside of Postgres at all? DuckDB is becoming a favorite swiss-army-knife for combining data inside flat files, in-memory dataframes, and with the Postgres scanner data in a database. I can do a lot of data cleaning by bringing my query-engine to my data rather than bringing my data to my query-engine.

I decided to try just bulk copying data from json or parquet flat files to the same tables as the above tests.

First, let’s dump our data into some flat files:

import duckdb

def save_beers_json(beers: Sequence[Dict[str,Any]]):
    with open("beers.json", "w") as fp:
        fp.writelines(json.dumps(beer) + "\n" for beer in beers)

def save_beers_parquet():
    with duckdb.connect(":memory:") as ddb:
        ddb.sql("""SELECT * from 'beers.json' """).write_parquet('beers.parquet')

Next, the code to load the data is the same, regardless of which flat file we are reading from. I used the copy method that was the fastest from the previous experiement.

@profile
def copy_with_duckdb(db_conn : psycopg.Connection, whichfile: str):

    with duckdb.connect(":memory:") as ddb:

        create_staging_table(db_conn)

        # I'm doing the data cleaning in SQL instead of
        # python this time

        cleaned_beers = ddb.sql("""SELECT
            id,        -- 1
            name,      -- 2
            tagline,   -- 3

            case
                -- YYYY to YYYY-01-01
                when strlen(first_brewed) = 4
                then first_brewed || '-01-01'

                -- MM/YYYY to YYYY-MM-01
                -- 1234567
                when strlen(first_brewed) = 7
                then first_brewed[4:7] || '-' || first_brewed[1:2] || '-01'

                /* fallback result won't cast to a date, so we will fail
                 * on unexpected data
                 */
                when first_brewed is not NULL
                then 'unexpected data format ' || first_brewed

            end::date
                AS first_brewed,  -- 4

            description,   -- 5
            image_url,     -- 6
            abv,           -- 7
            ibu,           -- 8
            target_fg,     -- 9
            target_og,     -- 10
            ebc,           -- 11
            srm,           -- 12
            ph,            -- 13
            attenuation_level,  -- 14
            brewers_tips,       -- 15
            contributed_by,         -- 16
            volume.value AS volume  -- 17
        FROM '{whichfile}'""")

        with db_conn.cursor() as cursor:
            with cursor.copy("""
                COPY staging_beers(
                    id,             -- 1
                    name,           -- 2
                    tagline,        -- 3
                    first_brewed,   -- 4
                    description,    -- 5
                    image_url,      -- 6
                    abv,            -- 7
                    ibu,            -- 8
                    target_fg,      -- 9
                    target_og,      -- 10
                    ebc,            -- 11
                    srm,            -- 12
                    ph,             -- 13
                    attenuation_level,  -- 14
                    brewers_tips,       -- 15
                    contributed_by,     -- 16
                    volume              -- 17
                )
                FROM STDIN"""
            ) as copy:
                copy.set_types(
                    (
                        "integer",  # 1
                        "text",  # 2
                        "text",  # 3
                        "date",  # 4
                        "text",  # 5
                        "text",  # 6
                        "numeric",  # 7
                        "numeric",  # 8
                        "numeric",  # 9
                        "numeric",  # 10
                        "numeric",  # 11
                        "numeric",  # 12
                        "numeric",  # 13
                        "numeric",  # 14
                        "text",  # 15
                        "text",  # 16
                        "integer",  # 17
                    )
                )

                row = cleaned_beers.fetchone()
                while row is not None:
                    copy.write_row(row)
                    row = cleaned_beers.fetchone()

Psycopg + Duckdb Results

Considering that I’m adding additional file I/O and another data query engine to the workflow, it’s not surprising that it takes longer to do things this way. Working with the binary parquet file format is faster than working with the text json file. The copy from json implementation needs to deserialize the json data, which was not included in the timings of the other test cases where I passed in a list of pre-deserialized dictionaries.

I’m also not counting the work to write out the staged data to a file when setting up the test case, which would increase overhead as well. Considering that we took out network latency overhead from the other test cases, this seems fair.

By reusing the tables from the previous tests, I’m loading into an unlogged table. When using duckdb as a final data cleaning and transformation tool, you might be loading into a logged table, with index updates and other overhead.

Which is a long way of saying, it’s not clear how “good” these numbers are: It’s not a real world example of what I really use this tool for day-to-day. But for me, one of the appeals of DuckDB is how much easier it is to express certain types of data munging versus my other options. As long as the additional overhead is reasonable, I’m always a fan of things that are simple and expressive, and the results don’t seem unreasonable to me at this point.

copy_with_duckdb(whichfile=beers.json)
Time   0.77
Memory 41.5

copy_with_duckdb(whichfile=beers.parquet)
Time   0.3496
Memory 8.96

This blog post is getting super-long, but I will say that I discovered while playing with the DuckDB option, that what I selected against makes quite a difference. I ended up selecting directly from ‘whichfile’, which had the least overhead versus first creating a DuckDB view (a little worse) or a DuckDB table (noticably worse) of my file, and then selecting from that instead.

Conclusions

  1. You should move from psycopg2 to psycopg for doing bulk inserts
  2. Use of execute and executemany does not change much, but calls to execute_values and execute_batch in psycopg2 will need to be converted to executemany in psycopg.
  3. The new psycopg copy object may work with minor changes versus the psycopg2 code, but the best performance will come from using the new copy object to its fullest potential. In addition, the improved copy API means you code may be simpler and easier to write/maintain.
  4. If you are staging your data with DuckDB, I’d love to hear about methods you’ve been using to profile the overhead or any performance tips and tricks! There is clearly more than one blog post to be written on this topic still…

Finally, if you’d like to play with this yourself, my source code is here

A long documentation page worth reading

raw_entry

This meme floated across data engineering LinkedIn recently, and I have to say I disagree with throwing the docs into the fire here. It’s absolutely true that the reference page pd.read_csv has an overwhelming number of options – I counted 52 (including the deprecated ones), but all those options mean that this function can handle just about any file you want to throw at it!

If you’re not going to memorize all of those options, here are some of the ones I use most frequently:

  • usecols

    One easy way to speed up reading a csv file is to not process the columns you don’t need, rather than reading the whole file and dropping columns later. You can also designate one of the columns as the index with index_col (which, despite the singular name, can also take a list if you want a multicolumn index).

  • dtype

    Setting the datatypes of certain columns can save on memory or clarify whether a column represents a floating point, integer, or boolean value.

  • converters
  • on_bad_lines (especially setting it to a callback function)

    Writing your own per-column data converters and/or adding an on_bad_lines callback means you can load a file and do data cleaning or unit conversions in one step. The argument to your converter lambda function will be the raw string contents of the column, while the on_bad_lines callback will get a list of all the column values.

Many of the other options on that long documentation page are actually ways of doing simple conversions without having to type up your own lambda function: true_values and false_values convert values to booleans, and na_values marks values that are nulls/nans. Arguments parse_dates, infer_datetime_format, date_parser, day_first and cache_dates control automatically converting columns to dates/times, and thousands and decimal control how strings are converted to numbers.

  • nrows
  • chunksize

    These optiona are useful for large files to get a sniff or to process the data in chunks. nrows will read a subset of the data, while chunksize will turn the output of read_csv into an interator to process in chunks:

    for chunk in read_csv('my_file.csv`, chunksize=10_000):
        process_next_chunk(chunk)
    

A powerful function for such a common operation is worth spending the time to understand in depth. If the reference page is too terse, there more details and examples in the I/O guide here.