Connections / Engines

How do I configure logging?

Configuring Logging を参照してください。

How do I pool database connections? Are my connections pooled?

SQLAlchemyはほとんどの場合、アプリケーションレベルの接続プールを自動的に実行します。含まれているすべてのダイアレクト(「メモリ」データベースを使用する場合のSQLiteを除く)では、 Engine オブジェクトは接続元として QueuePool を参照します。

詳細は Engine ConfigurationConnection Pooling を参照してください。

How do I pass custom connect arguments to my database API?

create_engine() 呼び出しは、追加の引数を``connect_args``キーワード引数経由で直接受け取ります:

e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

また、基本的な文字列と整数の引数の場合、通常はURLのクエリ文字列で指定できます。:

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

“MySQL Server has gone away”

このエラーの主な原因は、MySQL接続がタイムアウトになり、サーバーによって閉じられたことです。MySQLサーバーは、デフォルトで8時間の間アイドル状態にあった接続を閉じます。これに対応するために、 create_engine.pool_recycle 設定を有効にします。これにより、設定された秒数よりも古い接続は破棄され、次にチェックアウトされたときに新しい接続に置き換えられます。

データベースの再起動や、その他のネットワークの問題による一時的な接続の喪失に対応するより一般的なケースでは、プール内の接続は、より一般的な切断検出技術に応じて再利用されることがあります。Dealing with Disconnects セクションでは、”悲観的”」(例:pre-ping)と”楽観的”(例:グレースフルリカバリ)の両方の技術の背景を説明しています。最近のSQLAlchemyは”悲観的”なアプローチを好む傾向があります。

“Commands out of sync; you can’t run this command now” / “This result object does not return rows. It has been closed automatically”

MySQLドライバには、サーバーへの接続の状態が無効な状態になるさまざまな種類のエラーモードがあります。通常、接続が再度使用されると、これらの2つのエラーメッセージのいずれかが表示されます。これは、サーバーの状態が、クライアントライブラリが予期しない状態に変更されたためです。たとえば、クライアントライブラリが接続に対して新しいステートメントを発行すると、サーバーは予期したとおりに応答しません。

SQLAlchemyでは、データベース接続がプールされているので、接続上のメッセージが同期していないという問題がより重要になります。なぜなら、操作が失敗した場合、接続自体が使用できない状態にある場合、接続が接続プールに戻った場合、再度チェックアウトされたときに誤動作するからです。この問題の緩和策は、そのような失敗モードが発生したときに接続が 無効化 され、MySQLへの基礎となるデータベース接続が破棄されることです。この無効化は、多くの既知の失敗モードに対して自動的に発生し、 Connection.invalidate() メソッドを介して明示的に呼び出すこともできます。

また、このカテゴリーには失敗モードの2番目のクラスがあります。このクラスでは、エラーが発生したときに、 with session.begin_nested(): のようなコンテキストマネージャがトランザクションを”ロールバック”しようとしますが、接続のいくつかの失敗モードでは、ロールバック自体(RELEASE SAVEPOINT操作の場合もあります)も失敗し、誤ったスタックトレースを引き起こします。

当初、このエラーの原因は非常に単純で、マルチスレッド・プログラムが複数のスレッドから単一の接続でコマンドを呼び出すことを意味していました。これは、使用されている唯一のドライバであった元の”MySQLdb”ネイティブCドライバに適用されました。しかし、PyMySQLやMySQL-connector-Pythonのような純粋なPythonドライバの導入や、gevent/eventlet、マルチプロセッシング(多くはCeleryを使用)などのツールの使用の増加により、この問題を引き起こすことが知られている一連の要因があり、そのいくつかはSQLAlchemyのバージョン間で改善されていますが、避けられないものもあります。

  • スレッド間の接続の共有 - これが、この種のエラーが発生した本来の理由です。プログラムが2つ以上のスレッドで同時に同じ接続を使用しました。つまり、複数のメッセージセットが接続上で混同され、サーバ側のセッションがクライアントが解釈できない状態になりました。しかし、今日では通常、他の原因が考えられます。

  • プロセス間の接続のためのファイルハンドルの共有 - これは通常、プログラムが新しいプロセスを生成するために os.fork() を使用し、親プロセスに存在するTCP接続が1つ以上の子プロセスに共有される場合に発生します。複数のプロセスが本質的に同じファイルハンドルにメッセージを送信しているので、サーバはインターリーブされたメッセージを受信し、接続の状態を中断します。

    このシナリオは、プログラムがPythonの”マルチプロセッシング”モジュールを使用し、親プロセスで作成された Engine を使用する場合に非常に簡単に発生します。Celeryのようなツールを使用するときには、”マルチプロセッシング”が使用されるのが一般的です。正しいアプローチは、子プロセスが最初に開始されたときに、親プロセスから派生した Engine を破棄して新しい Engine を生成するか、親プロセスから継承された Engine が 、Engine.dispose() を呼び出すことによって破棄された接続の内部プールを持つことができるかのいずれかです。

  • Greenlet Monkeypatching w/Exits - PythonネットワークAPIをモンキーパッチするgeventやeventletなどのライブラリを使用する場合、PyMySQLなどのライブラリは、このモデルに対して明示的に開発されていないにもかかわらず、非同期モードで動作するようになりました。よくある問題は、アプリケーションのタイムアウトロジックが原因で、グリーンスレッドが中断されることです。この結果、 GreenletExit 例外が発生し、pure-Python MySQLドライバの作業が中断されます。これは、サーバからの応答を受信したか、接続の状態をリセットする準備をしていた可能性があります。例外によってすべての作業が中断されると、クライアントとサーバ間の通信が同期されなくなり、その後の接続の使用が失敗する可能性があります。バージョン1.1.0のSQLAlchemyはこれを防ぐ方法を知っています。データベース操作がいわゆる GreenletExit によって中断されたかのように、 GreenletExitException のサブクラスではないPythonの BaseException のサブクラスによって中断された場合、接続は無効になります。

  • Rollbacks / SAVEPOINT releases failing - 一部のクラスのエラーにより、トランザクションのコンテキスト内および”SAVEPOINT”ブロックでの操作時に接続が使用できなくなります。これらの場合、接続の失敗によりSAVEPOINTは存在しなくなりますが、SQLAlchemyまたはアプリケーションがこのセーブポイントを”ロールバック”しようとすると、”RELEASE SAVEPOINT”操作は失敗し、通常”savepoint does not exist”のようなメッセージが表示されます。この場合、Python 3では、エラーの最終的な”原因”も表示される例外出力のチェーンがあります。Python 2では、”チェーン”例外はありませんが、最近のバージョンのSQLAlchemyでは、元の失敗の原因を示す警告を出力しようとしますが、ROLLBACKの失敗である即時エラーがスローされます。

How Do I “Retry” a Statement Execution Automatically?

ドキュメントのセクション Dealing with Disconnects では、特定の接続が最後にチェックアウトされてから切断された、プールされた接続に使用できる方法について説明しています。この点で最も現代的な機能は create_engine.pre_ping パラメータです。このパラメータを使用すると、データベース接続がプールから取得されたときに”ping”を送信し、現在の接続が切断されている場合に再接続できます。

この”ping”は、接続が実際に操作に使用される にのみ送信されることに注意してください。接続が呼び出し元に配信されると、Python DBAPI 仕様に従って、autobegin 操作の対象になります。つまり、最初に使用されたときに自動的に新しいトランザクションを開始し、DBAPIレベルの connection.commit() または connection.rollback() メソッドが呼び出されるまで、後続の文に対して有効になります。

最近のSQLAlchemyの使用法では、 DBAPI autocommit mode が有効になっていない(これについては次のセクションで詳しく説明します)と仮定して、このトランザクション状態では常に一連のSQL文が呼び出されます。つまり、1つの文も自動的にコミットされません。操作が失敗すると、現在のトランザクション内のすべての文の効果が失われます。

これがステートメントの”再試行”という概念に対して持つ意味は、デフォルトの場合、接続が失われると、 トランザクション全体が失われる ということです。データはすでに失われているため、データベースが”再接続して再試行”し、中断した場所から続行できる便利な方法はありません。このため、SQLAlchemyには、データベース接続が使用中に切断された場合に、トランザクションの途中で機能する透過的な”再接続”機能がありません。操作の途中での切断を処理するための標準的なアプローチは、 トランザクションの最初から操作全体を再試行する ことです。これは、多くの場合、特定の関数が成功するまで何度も”再試行”するカスタムPythonデコレータを使用するか、または、操作が失敗する原因となるドロップされたトランザクションに対して回復力のある方法でアプリケーションを設計することによって行われます。

また、トランザクション内で処理されたすべてのステートメントを追跡し、”再試行”操作を近似するために新しいトランザクションでそれらすべてを再実行できる拡張の概念もあります。SQLAlchemyの event system では、そのようなシステムを構築することができますが、このアプローチも一般的には有用ではありません。なぜなら、これらの DML ステートメントが同じ状態に対して動作することを保証する方法がないからです。なぜなら、トランザクションが終了すると、新しいトランザクションでのデータベースの状態はまったく異なる可能性があるからです。アプリケーション・レベルのトランザクション・メソッドは、ステップを再実行する方法を最もよく知っているメソッドだからです。

そうでなければ、SQLAlchemyがトランザクションの途中で接続を透過的かつ暗黙的に”再接続”する機能を提供した場合、その結果、データが暗黙的に失われることになります。

ただし、トランザクションを 使用しない 場合は、次のセクションで説明するように、さらに多くのオプションを使用できます。

Using DBAPI Autocommit Allows for a Readonly Version of Transparent Reconnect

透過的な再接続メカニズムを持たないことの理論的根拠を述べた上で、前節はアプリケーションが実際にDBAPIレベルのトランザクションを使用しているという前提に基づいています。ほとんどのDBAPIが native”autocommit”settings を提供するようになったので、これらの機能を利用して、 read only,autocommit only operations のための限定された形式の透過的な再接続を提供することができます。透過的な文の再試行はDBAPIの cursor.execute() メソッドに適用できますが、DBAPIの cursor.executemany() メソッドに適用するのはまだ安全ではありません。なぜなら、その文は与えられた引数の任意の部分を消費している可能性があるからです。

Warning

次のレシピは、データを書き込む操作に 使用しないでください 。ユーザーは、このレシピを本番環境で使用する前に、レシピの仕組みを注意深く読み、理解し、特に対象となるDBAPIドライバに対して障害モードを非常に注意深くテストする必要があります。再試行メカニズムは、すべての場合において切断エラーの防止を保証するものではありません。

DialectEvents.do_execute() フックと DialectEvents.do_execute_no_params() フックを利用することで、DBAPIレベルの cursor.execute() メソッドに簡単な再試行メカニズムを適用することができます。これにより、文の実行中に切断をインターセプトすることができます。結果セットを完全にバッファしないDBAPIでは、結果セットのフェッチ操作中の接続エラーをインターセプト しません 。このレシピでは、データベースがDBAPIレベルのオートコミットをサポートしている必要があり、特定のバックエンドに対して 保証されていません 。単一の関数 reconnecting_engine() が提供されます。この関数は、指定された Engine オブジェクトにイベントフックを適用し、DBAPIレベルのオートコミットを有効にするalways-autocommitバージョンを返します。単一パラメータおよびパラメータなしの文の実行では、接続は透過的に再接続されます:

import time

from sqlalchemy import event

def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

上記のレシピが与えられれば、以下の概念実証スクリプトを使用して、トランザクション中の再接続を示すことができる。一度実行すると、5秒ごとに SELECT 1 文がデータベースに出力されます:

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

スクリプトの実行中にデータベースを再起動して、transparentreconnect操作を示します。:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上記のレシピは、SQLAlchemy 1.4でテストされています。

Why does SQLAlchemy issue so many ROLLBACKs?

SQLAlchemyは現在、DBAPI接続が”非オートコミット”モードであることを想定しています。これはPythonデータベースAPIのデフォルトの動作であり、トランザクションが常に進行中であることを想定する必要があります。接続プールは、接続が返されたときに connection.rollback() を発行します。これは、接続に残っているトランザクションリソースが解放されるようにするためです。テーブルリソースが積極的にロックされているPostgreSQLやMSSQLのようなデータベースでは、これは使用されなくなった接続内で行やテーブルがロックされたままにならないようにするために重要です。そうでなければ、アプリケーションがハングする可能性があります。ただし、これはロックのためだけではなく、MySQL withInnoDBを含むあらゆる種類のトランザクション分離を持つデータベースでも同様に重要です。古いトランザクション内にある接続は、そのデータが分離内の接続ですでに照会されている場合、そのデータを返します。MySQLでも古いデータが表示される理由の背景については、 https://dev.MySQL.com/doc/refman/5.1/en/innodb-transaction-model.html を参照してください。

I’m on MyISAM - how do I turn it off?

接続プールから返される接続の振る舞いは、 reset_on_return を使って設定できます。:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

I’m on SQL Server - how do I turn those ROLLBACKs into COMMITs?

reset_on_return は、 TrueFalseNone に加えて、 commitrollback の値を受け入れます。 commit に設定すると、接続がプールに返されるときにCOMMITが発生します。:

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

I am using multiple connections with a SQLite database (typically to test transaction operation), and my test program is not working!

SQLiteの :memory: データベースを使用している場合、デフォルトの接続プールは SingletonThreadPool です。これは、スレッドごとに正確に1つのSQLite接続を保持します。したがって、同じスレッドで使用されている2つの接続は、実際には同じSQLite接続になります。エンジンが QueuePool (現在のSQLAlchemyバージョンの非メモリデータベースのデフォルト)を使用するように、:memory: databaseを使用していないことを確認してください。

See also

Threading/Pooling Behavior - info on PySQLite’s behavior.

How do I get at the raw DBAPI connection when using an Engine?

通常のSAエンジンレベルのConnectionでは、DBAPI接続のpool-proxiedversionを ConnectionConnection.connection 属性経由で取得できます。実際のDBAPI接続では、その上の PoolProxiedConnection.dbapi_connection 属性を呼び出すことができます。通常の同期ドライバでは、プールでプロキシされていないDBAPI接続にアクセスする必要はありません。これは、すべてのメソッドが以下を介してプロキシされるためです:

engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

Changed in version 1.4.24: PoolProxiedConnection.dbapi_connection 属性が追加されました。これは、以前の PoolProxiedConnection.connection 属性に代わるもので、現在も使用可能です。この属性は、常にpep-249同期スタイルの接続オブジェクトを提供します。 PoolProxiedConnection.driver_connection 属性も追加されました。この属性は、どのようなAPIが提供されているかに関係なく、常に実際のドライバレベルの接続を参照します。

Accessing the underlying connection for an asyncio driver

asyncioドライバが使用されている場合、上記のスキームに2つの変更があります。1つ目は、 AsyncConnection を使用している場合、awaitableメソッド AsyncConnection.get_raw_connection() を使用して PoolProxiedConnection にアクセスする必要があることです。この場合に返される PoolProxiedConnection はsync-stylepep-249の使用パターンを保持し、 PoolProxiedConnection.dbapi_connection 属性はasyncioconnectionをsyncスタイルのpep-249 APIに適応させるSQLAlchemyに適応した接続オブジェクトを参照します。言い換えれば、asyncioドライバを使用している場合には 2 レベルのプロキシが行われます。実際のasyncio接続は driver_connection 属性から利用できます。前の例をasyncioに関して言い直すと、次のようになります:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

asyncioドライバを使用する場合、上記の”DBAPI”接続は、実際にはSQLAlchemyに適応した形式の接続であり、synchronous-stylepep-249スタイルのAPIを提供します。使用中のドライバの元のasyncio APIを提供する実際のasyncioドライバ接続にアクセスするには、 PoolProxiedConnectionPoolProxiedConnection.driver_connection 属性を介してアクセスできます。標準のpep-249ドライバの場合、 PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection は同義です。

接続をプールに戻す前に、その接続の独立性レベルの設定またはその他の操作固有の設定を正常に戻す必要があります。

設定を元に戻す代わりに、 Connection またはプロキシされた接続のいずれかで Connection.detach() メソッドを呼び出すことができます。このメソッドは、 Connection.close() が呼び出されたときに閉じられて破棄されるように、プールから接続の関連付けを解除します。

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?

これについては Using Connection Pools with Multiprocessing or os.fork() 節で説明します。