.net | SQL Distributed Transaction

问题背景

一年前,我们开始利用.Net 4.0的TPL(Task Parallel Library)并行计算技术对复杂计算的功能节点进行性能优化,这些复杂计算往往会包含大量对数据库的操作。在应用TPL时我们发现,如果每个Task都开启独立事务(RequireNew)的话,那么一切工作正常。但是,如果每个Task需要与父线程工作于同一个事务中(Required),则多线程并行计算时会经常性地抛出“其他会话正在使用事务的上下文”的错误(Transaction context in use by another session)。

 

在解决这个问题的过程中,我们惊讶地发现MSDN中所有关于TPL的代码示例,竟然完全没有涉及数据库操作的示例。从互联网上搜索,没有找到问题的解决方案:

1. How do distributed transactions behave with multiple connections to the same DB in a threaded environment?

2. TransactionScope not working with Parallel Extensions?

3. Error: Transaction context in use by another session

4. System.Transaction may fail in multiple-thread environment

 

根据KB279857的描述:

When this occurs, there are two different SPIDs in a single DTC transaction, and both connections/SPIDs are connected to the same server. In SQL Server, it is not permitted for two connections in the same transaction to execute a query against the same SQL instance concurrently

似乎表明SQLServer不支持同一个事务中有两个连接同时操作数据库。事实真相是否真是如此?答案是否定的。

初步分析

Ado.net让子线程与父线程复用同一个事务的技术是DependentTransaction,由调用Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete)创建,可以确保父线程在提交事务时自动等待所有子线程的事务提交完成。

 

为简化问题的分析,我们没有使用TPL,而是使用最原始的多线程编程方式,目的是确认DependentTransaction的能力。案例与MSDN上关于DependentTransaction的示例相同:

http://msdn.microsoft.com/en-us/library/ms229976(v=vs.90).aspx

注意测试环境需要首先确保DTC配置正确,网上反馈的一些TransactionAbortedException场景,实际是DTC没有配置正确导致的。多线程下,多个数据库连接无论访问的是否为同一个数据库,必然是DTC事务模式。这个概念必须首先建立,这是后续研究工作的基础。

 

我们测试的结果表明,MSDN的示例仅在只有一个子线程的情况下有效。如果有多于1个子线程,则必然会出现错误。测试过程中,最常出现的异常是Transaction context in use by another session,其次是The operation is not valid for the state of the transaction异常。从SqlProfiler跟踪看,前一种异常是Propagate Transaction已经成功了,后一种异常则处于不能Propagate Transaction的情况(但Get address已成功)。

 

查看Ado.net的代码,出现异常的位置是在SqlInternalConnection的EnlistNonNull方法中:

image

其中,GetTransactionCookie方法会触发The operation is not valid for the state of the transaction异常,而PropagateTransactionCookie会触发Transaction context in use by another session异常。

开启微软技术支持case

鉴于解决这个问题对我们具有重要价值,我们开启了一个微软技术支持case。技术支持人员倾向于认为这是一个SQLServer已知的限制,但这很难说服自己,原因主要有两点:

1. 首先,我们可以反过来推理,如果这个限制存在,那设计DependentTransaction的意义和价值何在?从MSDN关于DependentTransaction的描述看,明确指示适用于多个worker thread的场景:
image

2. 其次,也是我认为最不能接受的,是测试结果表明并非所有的测试循环都会失败,总会有大约30%的循环可以成功完成。如果所有任务全部失败,那没有问题,这就是SQLServer或Ado.net的限制。但如果有成功跑完嵌套事务的循环,那说明什么?说明更大可能是存在部分并发冲突的场景,而机制本身并不存在根本性的问题。

 

最终在微软朋友的帮助下,联系到了SQLTeam的核心开发人员,给出了非常专业的问题原因描述及WorkAround的解决方案:

1. If a transaction needs to be promoted from a delegated transaction to a distributed transaction (this happens when the second connection enlists in the transaction), then ADO.Net needs to re-use the original connection to instruct the SQL Server to promote the transaction. However, if that original connection already has an open SqlDataReader or is in the process of executing a command then it will throw an exception (SqlException: “There is already an open DataReader associated with this Command which must be closed first.”).
Workaround: To avoid this, the customer can pre-promote the transaction using TransactionInterop.GetTransmitterPropagationToken(see http://blogs.msdn.com/b/florinlazar/archive/2007/02/08/when-transaction-promotion-goes-bad.aspx for more details)

2. If a transaction is in the process of being promoted, then checking its current state (which ADO.Net does to ensure that the transaction is still active before enlisting) will throw an exception (TransactionException: “The operation is not valid for the state of the transaction.”)
Workaround: Again, this can be avoided by pre-promoting the transaction

3. The SQL Server will only permit a connection to enlist in a transaction if nothing else is running within the scope of the transaction (this can either be another connection enlisting in the transaction or executing a command), otherwise it will throw an error (SqlException: “Transaction context in use by another session.”)
Workaround: The only workaround for this is to retry the open if you receive the above SqlException (the easiest way to detect this is to check if the Number property of the SqlException is 3910)

 

上面这段话我来解释一下:

1. 当一个事务有第2个SqlConnection加入时,需要从本地事务提升为DTC事务。此时,Ado.net会首先使用事务中的第1个SqlConnection(original connection)来通知SQLServer对事务进行升级。如果此时original connection已关联到一个打开的SqlDataReader,则会抛出异常信息“There is already an open DataReader associated with this Command which must be closed first”,类型为SqlException。

2. 若事务正处于从本地事务提升为DTC事务的过程中,则此时任何获取事务状态的操作会无条件地抛出异常信息“The operation is not valid for the state of the transaction” ,类型为TransactionException。在多线程并行运行的场景,必然存在大量出现获取事务状态的操作。

3. SQLServer任何时候只允许将一个SqlConnection征募(enlist)到事务中。如果在征募过程中,发现该事务有其它SqlConnection也正处于征募状态,或正在执行SQL命令,则会抛出异常信息“Transaction context in use by another session”,类型为SqlException。

4. 异常1和异常2的解决方案是预先将本地事务提升为DTC事务,通过调用TransactionInterop.GetTransmitterPropagationToken实现。详细信息可参见博客文章http://blogs.msdn.com/b/florinlazar/archive/2007/02/08/when-transaction-promotion-goes-bad.aspx

5. 解决异常3没有太好的方案,只能是在打开SqlConnection时捕获异常(SqlException.Number=3910),然后尝试重新打开SqlConnection。这个方案不完美,原因一方面是异常本身的开销,另一方面是为了避免频繁冲突,一般会选择sleep一个短时间再重试,这又多了额外的线程调度开销。Sleep多长时间合适?同样很难抉择。理论上,SqlServer引擎可以扩展,自动消除这个限制。

 

以上种种问题,其实都发生在DTC事务升级过程中。如果所有SqlConnection都成功征募到事务中,并且事务已成功升级为DTC事务,则后面无论怎样执行SQL Command,都没有问题。这印证了我们当初怀疑多线程复用同一个事务,存在一些并发场景的限制是正确的。

总结

一旦解决了多线程复用同一个事务的问题,并行计算就可以应用嵌套事务的模式。由于Workarond方案的额外开销只发生在DTC事务升级过程中,相对而言是很轻量的,解决方案非常令人满意。尽管DTC事务相比本地事务会有性能上的下降,但在实际应用过程中,大量复杂功能节点经并行计算技术改造,往往取得数量级的优化效果,尽显点石成金的魔力。DTC事务的额外开销,完全可以忽略不计。毕竟现在客户使用的服务器,CPU核数往往是24核、32核这样的级别。

 

到目前为止,尚未看到微软给出明确的官方解决方案,因此本文涉及的技术尚属于未公开的内幕,存在一定的技术风险,特此提醒一下。从个人的角度看,如果并行计算只能使用独立事务模式,而不能使用嵌套事务模式的话,那并行计算技术应用的场景将受到极大的限制,并行计算的价值也就无从充分发挥。

 

最后,感谢一下微软朋友们的帮助和支持!

附录

测试案例的源代码如下(Workarond方案涉及的代码标记为红色):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Transactions;
using System.Data.SqlClient;
using System.Threading;

namespace DTCTest
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                for (int i = 0; i < 10000; i++)
                    TestMSDNSample();

                System.Console.WriteLine("test done");
            }
            catch (Exception e)
            {
                System.Console.WriteLine("test fail: " + e.Message);
            }
        }

        static public void TestMSDNSample()
        {
            TransactionOptions options = new TransactionOptions();
            options.IsolationLevel = IsolationLevel.ReadCommitted;

            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, options))
            {
                // 预先升级为DTC事务
                TransactionInterop.GetTransmitterPropagationToken(Transaction.Current);

                Transaction currentTransaction = Transaction.Current;
                WorkerThread workerThread = new WorkerThread();
                workerThread.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread1");
                WorkerThread workerThread2 = new WorkerThread();
                workerThread2.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread2");
                WorkerThread workerThread3 = new WorkerThread();
                workerThread3.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread3");
                WorkerThread workerThread4 = new WorkerThread();
                workerThread4.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread4");
                WorkerThread workerThread5 = new WorkerThread();
                workerThread5.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread5");
                /* Do some transactional work here, then: */
                scope.Complete();
            }
        }
    }

    public class WorkerThread
    {
        public static string Connection_String = "user id=sa;data source=int3;initial catalog=msdb;password=ufsoft";//MultipleActiveResultSets=true";
        public const string Run_SQL = "update MSdbms set Version='aaa'";

        public void DoWork(DependentTransaction dependentTransaction, string threadName)
        {
            Thread thread = new Thread(ThreadMethod);
            // thread name for debug
            thread.Name = threadName;
            thread.Start(dependentTransaction);
        }

        public void ThreadMethod(object transaction)
        {
            Console.WriteLine("ThreadName=" + Thread.CurrentThread.Name);

            DependentTransaction dependentTransaction = transaction as DependentTransaction;
            try
            {
                using (TransactionScope ts = new TransactionScope(dependentTransaction))
                {
                    /* Perform transactional work here */
                    SqlConnection sqlConn = new SqlConnection(Connection_String);
                    OpenConn(sqlConn);
                    try
                    {
                        SqlCommand cmd = sqlConn.CreateCommand();
                        cmd.CommandType = System.Data.CommandType.Text;
                        cmd.CommandText = Run_SQL;
                        cmd.ExecuteNonQuery();
                        Thread.Sleep(10);
                        cmd.ExecuteNonQuery();
                        ts.Complete();
                    }
                    finally
                    {
                        sqlConn.Close();
                    }
                }
                dependentTransaction.Complete();
            }
            catch (Exception e)
            {
                dependentTransaction.Rollback() ;
                Console.WriteLine(String.Format("{0} has an exception raised: {1}", Thread.CurrentThread.Name, e.Message));
            }
            finally
            {
                dependentTransaction.Dispose();
            }
        }

        private void OpenConn(SqlConnection conn)
        {
            while (true)
            {
                try
                {
                    conn.Open();
                    break;
                }
                catch (SqlException ex)
                {
                    Console.WriteLine(ex.Message);
                    if (ex.Number == 3910)
                    {
                        Thread.Sleep(10);
                        continue;
                    }
                }
            }
        }
    }
}
Comments