问题背景一年前,我们开始利用.Net 4.0的TPL(Task Parallel Library)并行计算技术对复杂计算的功能节点进行性能优化,这些复杂计算往往会包含大量对数据库的操作。在应用TPL时我们发现,如果每个Task都开启独立事务(RequireNew)的话,那么一切工作正常。但是,如果每个Task需要与父线程工作于同一个事务中(Required),则多线程并行计算时会经常性地抛出“其他会话正在使用事务的上下文”的错误(Transaction context in use by another session)。
在解决这个问题的过程中,我们惊讶地发现MSDN中所有关于TPL的代码示例,竟然完全没有涉及数据库操作的示例。从互联网上搜索,没有找到问题的解决方案:
根据KB279857的描述:
似乎表明SQLServer不支持同一个事务中有两个连接同时操作数据库。事实真相是否真是如此?答案是否定的。 初步分析Ado.net让子线程与父线程复用同一个事务的技术是DependentTransaction,由调用Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete)创建,可以确保父线程在提交事务时自动等待所有子线程的事务提交完成。
为简化问题的分析,我们没有使用TPL,而是使用最原始的多线程编程方式,目的是确认DependentTransaction的能力。案例与MSDN上关于DependentTransaction的示例相同:
注意测试环境需要首先确保DTC配置正确,网上反馈的一些TransactionAbortedException场景,实际是DTC没有配置正确导致的。多线程下,多个数据库连接无论访问的是否为同一个数据库,必然是DTC事务模式。这个概念必须首先建立,这是后续研究工作的基础。
我们测试的结果表明,MSDN的示例仅在只有一个子线程的情况下有效。如果有多于1个子线程,则必然会出现错误。测试过程中,最常出现的异常是Transaction context in use by another session,其次是The operation is not valid for the state of the transaction异常。从SqlProfiler跟踪看,前一种异常是Propagate Transaction已经成功了,后一种异常则处于不能Propagate Transaction的情况(但Get address已成功)。
查看Ado.net的代码,出现异常的位置是在SqlInternalConnection的EnlistNonNull方法中: 其中,GetTransactionCookie方法会触发The operation is not valid for the state of the transaction异常,而PropagateTransactionCookie会触发Transaction context in use by another session异常。 开启微软技术支持case鉴于解决这个问题对我们具有重要价值,我们开启了一个微软技术支持case。技术支持人员倾向于认为这是一个SQLServer已知的限制,但这很难说服自己,原因主要有两点: 1. 首先,我们可以反过来推理,如果这个限制存在,那设计DependentTransaction的意义和价值何在?从MSDN关于DependentTransaction的描述看,明确指示适用于多个worker thread的场景: 2. 其次,也是我认为最不能接受的,是测试结果表明并非所有的测试循环都会失败,总会有大约30%的循环可以成功完成。如果所有任务全部失败,那没有问题,这就是SQLServer或Ado.net的限制。但如果有成功跑完嵌套事务的循环,那说明什么?说明更大可能是存在部分并发冲突的场景,而机制本身并不存在根本性的问题。
最终在微软朋友的帮助下,联系到了SQLTeam的核心开发人员,给出了非常专业的问题原因描述及WorkAround的解决方案:
上面这段话我来解释一下: 1. 当一个事务有第2个SqlConnection加入时,需要从本地事务提升为DTC事务。此时,Ado.net会首先使用事务中的第1个SqlConnection(original connection)来通知SQLServer对事务进行升级。如果此时original connection已关联到一个打开的SqlDataReader,则会抛出异常信息“There is already an open DataReader associated with this Command which must be closed first”,类型为SqlException。 2. 若事务正处于从本地事务提升为DTC事务的过程中,则此时任何获取事务状态的操作会无条件地抛出异常信息“The operation is not valid for the state of the transaction” ,类型为TransactionException。在多线程并行运行的场景,必然存在大量出现获取事务状态的操作。 3. SQLServer任何时候只允许将一个SqlConnection征募(enlist)到事务中。如果在征募过程中,发现该事务有其它SqlConnection也正处于征募状态,或正在执行SQL命令,则会抛出异常信息“Transaction context in use by another session”,类型为SqlException。 4. 异常1和异常2的解决方案是预先将本地事务提升为DTC事务,通过调用TransactionInterop.GetTransmitterPropagationToken实现。详细信息可参见博客文章http://blogs.msdn.com/b/florinlazar/archive/2007/02/08/when-transaction-promotion-goes-bad.aspx 5. 解决异常3没有太好的方案,只能是在打开SqlConnection时捕获异常(SqlException.Number=3910),然后尝试重新打开SqlConnection。这个方案不完美,原因一方面是异常本身的开销,另一方面是为了避免频繁冲突,一般会选择sleep一个短时间再重试,这又多了额外的线程调度开销。Sleep多长时间合适?同样很难抉择。理论上,SqlServer引擎可以扩展,自动消除这个限制。
以上种种问题,其实都发生在DTC事务升级过程中。如果所有SqlConnection都成功征募到事务中,并且事务已成功升级为DTC事务,则后面无论怎样执行SQL Command,都没有问题。这印证了我们当初怀疑多线程复用同一个事务,存在一些并发场景的限制是正确的。 总结一旦解决了多线程复用同一个事务的问题,并行计算就可以应用嵌套事务的模式。由于Workarond方案的额外开销只发生在DTC事务升级过程中,相对而言是很轻量的,解决方案非常令人满意。尽管DTC事务相比本地事务会有性能上的下降,但在实际应用过程中,大量复杂功能节点经并行计算技术改造,往往取得数量级的优化效果,尽显点石成金的魔力。DTC事务的额外开销,完全可以忽略不计。毕竟现在客户使用的服务器,CPU核数往往是24核、32核这样的级别。
到目前为止,尚未看到微软给出明确的官方解决方案,因此本文涉及的技术尚属于未公开的内幕,存在一定的技术风险,特此提醒一下。从个人的角度看,如果并行计算只能使用独立事务模式,而不能使用嵌套事务模式的话,那并行计算技术应用的场景将受到极大的限制,并行计算的价值也就无从充分发挥。
最后,感谢一下微软朋友们的帮助和支持! 附录测试案例的源代码如下(Workarond方案涉及的代码标记为红色): using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Transactions; using System.Data.SqlClient; using System.Threading; namespace DTCTest { class Program { static void Main(string[] args) { try { for (int i = 0; i < 10000; i++) TestMSDNSample(); System.Console.WriteLine("test done"); } catch (Exception e) { System.Console.WriteLine("test fail: " + e.Message); } } static public void TestMSDNSample() { TransactionOptions options = new TransactionOptions(); options.IsolationLevel = IsolationLevel.ReadCommitted; using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required, options)) { // 预先升级为DTC事务 TransactionInterop.GetTransmitterPropagationToken(Transaction.Current); Transaction currentTransaction = Transaction.Current; WorkerThread workerThread = new WorkerThread(); workerThread.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread1"); WorkerThread workerThread2 = new WorkerThread(); workerThread2.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread2"); WorkerThread workerThread3 = new WorkerThread(); workerThread3.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread3"); WorkerThread workerThread4 = new WorkerThread(); workerThread4.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread4"); WorkerThread workerThread5 = new WorkerThread(); workerThread5.DoWork(currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete), "Thread5"); /* Do some transactional work here, then: */ scope.Complete(); } } } public class WorkerThread { public static string Connection_String = "user id=sa;data source=int3;initial catalog=msdb;password=ufsoft";//MultipleActiveResultSets=true"; public const string Run_SQL = "update MSdbms set Version='aaa'"; public void DoWork(DependentTransaction dependentTransaction, string threadName) { Thread thread = new Thread(ThreadMethod); // thread name for debug thread.Name = threadName; thread.Start(dependentTransaction); } public void ThreadMethod(object transaction) { Console.WriteLine("ThreadName=" + Thread.CurrentThread.Name); DependentTransaction dependentTransaction = transaction as DependentTransaction; try { using (TransactionScope ts = new TransactionScope(dependentTransaction)) { /* Perform transactional work here */ SqlConnection sqlConn = new SqlConnection(Connection_String); OpenConn(sqlConn); try { SqlCommand cmd = sqlConn.CreateCommand(); cmd.CommandType = System.Data.CommandType.Text; cmd.CommandText = Run_SQL; cmd.ExecuteNonQuery(); Thread.Sleep(10); cmd.ExecuteNonQuery(); ts.Complete(); } finally { sqlConn.Close(); } } dependentTransaction.Complete(); } catch (Exception e) { dependentTransaction.Rollback() ; Console.WriteLine(String.Format("{0} has an exception raised: {1}", Thread.CurrentThread.Name, e.Message)); } finally { dependentTransaction.Dispose(); } } private void OpenConn(SqlConnection conn) { while (true) { try { conn.Open(); break; } catch (SqlException ex) { Console.WriteLine(ex.Message); if (ex.Number == 3910) { Thread.Sleep(10); continue; } } } } } } |
prog | Programming > prog.lang | .Net >