top of page
  • Writer's pictureTony Zeljkovic

A deep dive into the snowflake python API

Developing data applications on your python data platform of choice.


Intro

When you’re working with snowflake as a data engineer, chances are you will be developing applications and capabilities of your data platform on top of snowflake.

At the very least, you will be interacting with third party tools that directly integrate with snowflake themselves such as data transformations tools like dbt, reporting tools like looker and data orchestrators such as airflow.

What underlies all of these is interacting with the snowflake platform through a programmatic manner. As such, we can view the development of snowflake based external applications as follows:


There is the snowflake platform with it’s capabilities which we can interface with in a few ways:

  • Through an UI such as snowflake worksheets and snowflake snowsight

  • Through the CLI interface snowSQL

  • Through programmatic interfaces in various programming languages (to view all available connectors, check it out in the snowflake documentation).

Almost every external application, SaaS or integration will use a programmatic interface or snowflake connector in some way shape or form to build out functionality.

An example of this would be the data transformation tool dbt. Dbt has a snowflake integration, but under the hood it will use the snowflake connector for python to manage executing SQL commands against snowflake.

Another example would be the snowflake plugin in VScode which uses the nodeJS connector to execute various SQL commands based on UI input.

Considering this, understanding the programmatic interface with snowflake allows to:

  • Build custom data applications for your data platform

  • Understand and debug (snowflake) third party tools

  • Integrate Snowflake more natively with distributed computing on your data orchestrator

  • Build out capabilities that are not quite handled by higher level APIs such as Snowpark

  • Understand the fundamental principles of how snowflake handles queries, results and more.

The components of a snowflake connector: The python example


In this article we’ll be exploring the python connector to understand snowflake connectors in general. As each connector implements the same fundamentals, we can use the example of python to understand the most fundamental principles of all connectors.

Fundamentally, there are two main components to control for in the connector:

  • The connection

  • The cursor

The connection configuration determines how you connect to snowflake from python.

The cursor is a database object used to retrieve, manipulate, and navigate through a snowflake result set. It’s used to send SQL instructions to snowflake as well as store results from any execute method performed.

The cursor has execution method that determine what SQL operation we actually execute against snowflake and how we process the result back into python.

The main components of the snowflake connector and how the interact with snowflake.

Connection

This component will dictate how the python connector can connect to your snowflake account. The relevant python class is SnowflakeConnection, which will handle most of the configurations around connecting to snowflake and some more. There is a lot of things that we can set as of current release 3.10.1. Instead of going through every single setting, we will go through a few conceptual subcategories and go over the most important attributes you may or may not want to set. For a more extensive deep dive, feel free to refer to the source code of SnowflakeConnection.

Defining security and authentication parameters

I will start with security of snowflake connections as most settings around connectivity revolve in some way, shape or form around security. Considering the recent security breaches on the Snowflake platform of several large organisations, it is more relevant than ever to pay attention to this.

To connect to an account you will need the correct account identifier based on your authentication method. When connecting with a username, use the user attribute to define the snowflake username to log in with.

The most significant attribute is authenticator. This attribute will determine what authentication method is used to authenticate to snowflake. The table below shows the currently supported first-party authentication methods you’ll probably use most alongside some relevant details for each.

Username and password alone are vulnerable to various attacks such as phishing, brute force, and credential stuffing. Passwords can be weak, reused across multiple sites, and stolen in data breaches.

SSH key pairs are more secure than passwords because they use public key cryptography. The private key is kept secret and the public key is shared. However, the security relies heavily on the protection of the private key. If the private key is compromised or infrequently rotated, the security is significantly weakened.

Combining username and password with MFA significantly enhances security. Even if the password is compromised, the attacker would also need access to the second factor (such as a physical token, mobile app, or biometrics), making unauthorized access much more difficult. However, using a password still confers some of the weaknesses of general user+pass flows.

Single Sign-On (SSO) via Okta provides a higher level of security through centralized authentication. It often includes robust features like strong password policies, adaptive multi-factor authentication (MFA), and anomaly detection. The downside is that the security of the entire system depends on the security of the SSO provider. These providers are prime targets for hacks. Additionally, there may be additional cost involved with these solutions, often more suited to enterprise users.

Chances are, if you work in a company of any decent size, there will be some sort of proxy or VPN solution used to connect to things. For proxies, it’s possible to set things up through the attributes proxy_host, proxy_port, proxy_user and proxy_password. For VPNs, configuration will depend from VPN to VPN provider but usually, it’s possible to set a CA certificate for the VPN of choice. The snowflake connector uses pythons standard certificate store so a certificate can be simply set as:

cat public_cert.crt >> $(python -m certifi)
pip config set global.cert $(python -m certifi)

We may delve deeper in the future in topics around these security protocols, but these things should give you a rough idea how to implement these common setups.


One last thing to note about security is that there are various security options available such as insecure mode, ocsp_fail_open, port and more. The best advice here is to never alter these values. If you find yourself in the need to configure these sorts of settings to resolve issues around connectivity, it’s a lot more prudent to find the actual root cause of your issue rather than downgrading the security of your application to make a connection.


Defining database parameters

Within the connection configuration, there are several attributes that map to database components that you would typically use when interacting with snowflake.


Using database, schema , warehouse and role we can define the characteristics of our database connection. This will follow snowflake role based access control configurations in your account.


If you want the snowflake connector to check if your combination of database parameters are valid for your snowflake account, you can set validate_default_parameters to true which will give you an error if you try to connect with incorrect database parameters.

Defining how API data is parsed in python

When interacting with snowflake, at the core we’re exchanging data over the web which means we will have some sort of serialization and deserialization happening between the client and server.


As such, there is a lot of “translation” happening between Snowflake API compatible formats and objects that are native to python.


There’s a few important parameters to understand here, though you may not need to alter them often.


The python snowflake connector will handle database exceptions with a custom exception class that inherits from python exceptions.


Essentially, this exception class will parse database errors from the API responses and convert them to python exceptions such that your python application can have native exceptions on database errors.


If you wish to change the parsing, formatting or other handling of errors, you can write your own classes and overwrite with the errorhandler attribute.


When it comes to converting snowflake API types to python types, this is handled by a SnowflakeConverter class. Simply put, these classes determine how to translate types from API format to python.


Generally this is handled pretty well, but, in some cases it may be useful to overwrite this behavior when you are developing your data platform if you are using certain libraries and data constructs that are not part of the standard library.


You may for example want to add additional type validation through pydantic. To overwrite the converter class, you can use converter_class in your connection config.


The python snowflake connector can result many different data types. We will dive a bit deeper into different results data types later but suffice to say that we can influence how results are returned to us through parameters in the connection class.


Two settings that come to mind are json_result_force_utf8_decoding which will force the connector to decode all json strings as utf8 when returning json and client_prefetch_threads which impacts how many threads are used to fetch the first set of results.


Again, we will get back to these in depth in a bit, but just be aware that some result parsing settings are configured at the connection level.



Defining session and timeout parameters

When connecting to snowflake to execute some SQL statements, we can view the time from start to end of the connection as a session. As such, it’s prudent to consider how we bound these sessions to prevent issues like hanging connections.


Most relevant parameters around this concept are timeouts. In the snowflake connection configuration we can set a few different kinds of timeouts:


When we hit any of the above timeouts, we can set a backoff_policy to determine how long the connector will back off between connection attempts.

The snowflake connector implements linear, exponential and mixed backoff functions, though it is possible to define your own. The default is mixed_backoff(), this is to prevent the issue of the thundering herd problem where many independent processes may try to perform API calls simultaneously, straining the available API bandwidth.

💡 Warehouse level timeouts impact the snowflake connector too so be mindful of your overall timeout structure.

Finally, if we want to be able to distinguish in our snowflake logs which application was connected to snowflake through the API, we can use the application attr to add a name of the application (default = "PythonConnector"). This can be useful if we have multiple types of application connecting to snowflake and we want to keep track of connections made through the information schema login table.

Cursor

Once a connection object is established we can derive a cursor object from it through connection.cursor(). A SnowflakeCursor will handle execution of various SQL commands against a snowflake database, as well as manage the retrieval and storage of the results of those commands.

As some configurations happen as attributes of the cursor object, some as method choice for execution and some as attributes of execution methods, it makes more sense to consider cursor configuration from the perspective of capabilities.

In the following sections we will consider how a SnowflakeCursor can be used to read/write data, execute queries synchronously or asynchronously and how we can handle different formats to process queries in a data orchestrator friendly distributive fashion.

Synchronous and Asynchronous SQL Statements

There’s generally two ways to handle SQL statement execution with the snowflake API:

  1. Synchronous: The application will send the query for execution and wait for the results to be available before continuing the program. The results are therefore a python object representation of the results.

  2. Asynchronous: The application will send the query for execution and NOT wait for the results. The result is thus a query_id which can be evaluated later to retrieve results.

Generally, you’d want to use synchronous queries as they are easier to handle and cover most use-cases. To execute synchronously, simply run like:

conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

when fetching results for async queries, this requires a little bit more work, your program should determine whether a query is still running and whether it has been successful if finished. Then you can proceed to retrieve results as usual.

This might look something like:

from snowflake.connector.constants import QueryStatus

# ... process sending initial query...
query_id = conn.cursor.execute_async(cursor_execute_config).sqfid
# ... process receiving query results...
while conn.is_still_running(
    conn.get_query_status_throw_if_error(query_id)
):
    print(conn.get_query_status_throw_if_error(query_id))
    time.sleep(check_interval)
if conn.get_query_status_throw_if_error(query_id) == QueryStatus.SUCCESS:
    resp = cur.get_results_from_sfqid(sfqid=query_id)
    # To work with the results, we need to explicitly fetch them first.
    cur.fetch_arrow_batches()
    print(
        f"Async query completed successfully and cursor ready for fetching. {resp}"
    )
    return cur
else:
    raise ValueError(
        f"Query failed, status is {conn.get_query_status_throw_if_error(query_id)}"
    )

Note that we are checking the query status consistently until we reach a completed query with either an error or success message. Notice that running fetch_arrow_batches() will store results and their metadata pointers in the cursor object itself.



Fetching data with the snowflake cursor

Before we get into the separate fetching methods for snowflake cursors, we’re going to take a deeper look into how the python connector fetches data from snowflake.

How snowflake fetches large datasets

When working with Snowflake, it's important to understand the mechanisms behind data fetching, especially for large datasets. Depending on the configuration, the API employs different processes to fetch data:

  • Small Datasets: The API can prefetch the entire response and load it directly into a Python object in memory. This is efficient and quick, making it ideal for smaller datasets.

  • Large Datasets: For larger datasets, the API can be configured to use a chunking method. Initially, a chunk of data is prefetched into memory, similar to the process for smaller datasets. Subsequently, the API fetches metadata pointers to the next chunks of data. This process is structured like a chain, with each chunk containing a pointer to the next chunk's location.

Note that the prefetch response and chunk responses are returned from different URLs. The prefetch response is returned directly by the SQL endpoint the connector interacts with https://<account_identifier*>*.snowflakecomputing.com/api . The chunking responses will return pre-signed S3 urls with a default lease of 6 hours.

What happens under the hood is that snowflake has S3 resources specifically dedicated to your account/service user that you use to set up initial S3 connections with. Snowflake will provide you with chunks/partitions of your data through S3 through S3-presigned-urls using your service account id. These links are of a shape similar to:


https://sfc-<server>-<data-center>-customer-stage.s3.<aws-cloud-region>/<service account id>/results/<statement_id>/main/data_<chunk_id>?<aws-s3-presigned-url-details>


Please do note that from a security perspective, these links also need to be permitted by any VPN/proxy you might be using and you must be very careful about logging these links as ANY outside party can use these links to access the chunk data within the lease time.

The following is a visual representation of what happens under the hood:


Essentially, when snowflake processes your query, it will store the results into chunks in S3 and the iterator objects within the snowflake connection objects will essentially just download each chunk through a pre-signed s3 url into an object in memory of your choice.


Internal python data types for cursor results

Single process fetching methods

With this type of method of form cursor.fetch_<type>() , the snowflake cursor is able to fetch data from a synchronous or asynchronous query into several formats into a single process. Depending on the size of the data and program complexity there are several possible methods.

Fetchall

cur.fetchall() is the most straightforward fetching method. When a statement result is fetched with this function, a list of tuples is generated where each tuple element corresponds to a column in the result set. For example:

# command
cmd = """
select *
From  (values ('Dummy1')
            ,('Dummy2')
    ) A(Dummies)
"""

cursor.execute(cmd)
results = cursor.fetchall()

# results
results: [
        (
            'Dummy1',
        ),
        (
            'Dummy2',
        ),
    ] (list) len=2

Be careful however, this method will load all of the data directly into memory. This method is NOT advised to run for queries where large result sets are expected. This method can be quite useful when running sql statements that can have variable outcomes such as Data Control Language (DCL) statements.

Fetchone


cur.fetchone() is similar to the fetchall() method, but instead of fetching all the data into a list of tuples at once, it will fetch one record at a time. Running the same statement as above, we get:

results: (
    'Dummy1',
) (tuple) len=1

Fetchmany

cur.fetchmany() is similar to cur.fetchone(), but it returns nrows of data from the query result set. This can be useful for testing smaller subsets of the data before proceeding with the application code.

Fetch_arrow_all

cur.fetch_arrow_all() will prefetch the first chunk(s) of results into the cursor and provide a generator for fetching the metadata references to the proceeding arrow chunks. Again, do note that a fetch function using all will directly load everything into memory:

total_rows = 10000
cmd = """
select *
From  (values
"""
for i in range(total_rows):
    cmd += f"('Dummy{i}')"
    if i < total_rows-1:
        cmd += ",\\n"
    else:
        cmd += "\\n"
cmd += ") A(Dummies)"

cursor.execute(cmd)
results = cursor.fetch_arrow_all()

# results
results: (
    pyarrow.Table
    DUMMIES: string not null
    ----
    DUMMIES:
    [["Dummy0","Dummy1","Dummy2","Dummy3","Dummy4",...,"Dummy9995","Dummy9996","Dummy9997","Dummy9998","Dummy9999"]]
) (Table) len=10000

This method is useful if your application can handle arrow format well. The arrow object is also pretty memory efficient, the above result object is ±14 kb large.

FetchArrowBatches

Chances are, you sometimes have to query datasets that will be many times larger than the example above. If these datasets exceed the size of your memory allocation to the process, this will not suffice. For these situations, there is a set of fetching methods that use batches to retrieve data from a snowflake query.

let’s say we generate a much larger dataset of 10.000 rows by 100 columns

  # Define the number of rows and columns
  total_rows = 10000
  total_columns = 100

  # Start building the SQL command
  cmd = "SELECT * FROM (VALUES\\n"

  # Iterate to create rows
  for i in range(total_rows):
      # Generate values for each column
      values = ", ".join(f"'Dummy{i}_{j}'" for j in range(total_columns))
      cmd += f"({values})"
      if i < total_rows - 1:
          cmd += ",\\n"
      else:
          cmd += "\\n"

  # Finish the SQL command
  cmd += ") A(" + ", ".join(f"Col{j}" for j in range(total_columns)) + ")"

When we now fetch with cur.fetch_arrow_batches() this will produce a python generator which will produce 6 chunks of arrow tables:

cursor.execute(cmd)
results = cursor.fetch_arrow_batches()

# results
counter=0
for batch in results:
    counter+=1
    print(counter)
# results
1
2
3
4
5
6

where each chunk will match the format of the arrow_fetch_all tables.

An important thing to note is that these generators cannot be easily passed around to other processes or tasks which has implications for data orchestration tooling. While it is possible to compute each chunk through a for loop and submit it to separate tasks in your orchestrator, this is rather inefficient because you are passing around large amounts of data into inputs of your downstream tasks. For that purpose, there are ResultBatches which are described in a later section.

FetchPandasAll and FetchPandasBatches

The snowflake connector also support fetching and converting data into pandas dataframes. Running the same code as above, but for a table of 5x5, we get the following format of our results:

    results: <DataFrame({
        'COL0': <Series({
            0: 'Dummy0_0',
            1: 'Dummy1_0',
            2: 'Dummy2_0',
            3: 'Dummy3_0',
            4: 'Dummy4_0',
        })>,
        'COL1': <Series({
            0: 'Dummy0_1',
            1: 'Dummy1_1',
            2: 'Dummy2_1',
            3: 'Dummy3_1',
            4: 'Dummy4_1',
        })>,
        'COL2': <Series({
            0: 'Dummy0_2',
            1: 'Dummy1_2',
            2: 'Dummy2_2',
            3: 'Dummy3_2',
            4: 'Dummy4_2',
        })>,
        'COL3': <Series({
            0: 'Dummy0_3',
            1: 'Dummy1_3',
            2: 'Dummy2_3',
            3: 'Dummy3_3',
            4: 'Dummy4_3',
        })>,
        'COL4': <Series({
            0: 'Dummy0_4',
            1: 'Dummy1_4',
            2: 'Dummy2_4',
            3: 'Dummy3_4',
            4: 'Dummy4_4',
        })>,
    })> (DataFrame) len=5

The benefit of pandas dataframes is obvious to any data professional that has been working with data and python: it is much easier to do transformations. Pandas has an extensive API for doing so. The downside is a larger memory footprint, but. Similarly to fetch_arrow_batches(), we can perform fetch_pandas_batches() to produce a generator of pandas dataframes to work with.

Multi-process fetching: ResultBatches

A common situation when handling large amounts of data from a database is distributing the transformation workload across several worker nodes in a data orchestrator like airflow or big data frameworks like Spark.

The earlier fetching methods work fine when working with a single process but they don’t cover this use case very well.

When we want to integrate with systems like these, snowflake provides a convenient result format called a ResultBatch which we can access through the method fetch_result_batches(). Unlike our generators which are difficult to pass around in frameworks heavily utilizing python futures like prefect or there’s limited bandwith, these chunks can be serialized with a pickle.dump into a small efficient binary representation (think deca kilobytes). This binary sequence can be passed around in most data orchestrators just fine.

When this data structure is deserialized in a target process, it can be deserialized into any of the other representations.


Loading data into snowflake through python connector

There is many ways to load data into snowflake but for the purposes of this article I will only briefly describe the three main methods when using the snowflake connector for python:

  1. It’s possible to do a standard cycle of loading data into S3, connecting this to a snowflake stage and then issuing copy into command through the cursor.

  2. execute_cursor() can be extended with a parameter filestream to push a python IO object directly to a snowflake table. This is useful for larger-than-memory, non-dataframe objects.

  3. The snowflake connector library also has a submodule called pandas_tools which contains a function called write_pandas(). This can used to write a pandas dataframe directly to snowflake. It auto-generates and runs statements you would typically run to implement the first way.

Closing remarks

As an official snowflake integration partner, Zelytic helps organizations to set up their snowflake infrastructure.

If your company is in need of snowflake consultation services, Zelytics offers a complimentary consultation to help you gain clarity around your main challenges and develop a data-driven strategy to overcome them.

Let’s talk and get to know each other and see what we can do for your business. 


Schedule a free consultation today.

0 views0 comments

Comments


bottom of page