意料外的问题
/**
* 将指定的 [amount] 的金额,从 [accountA] 转移到 [accountB]
*/
suspend fun transferMoney(accountA: String, accountB: String, amount: Int) {
// 使用了 IO dispatcher,所以该 DB 的操作在 IO 线程上进行
withContext(Dispatchers.IO) {
database.beginTransaction() //在 IO-Thread-1 线程上开始执行事务
try {
// 协程可以在与调度器(这里就是 Dispatchers.IO)相关联的任何线程上绑定并继续执行。同时,由于事务也是在 IO-Thread-1 中开始的,因此我们可能恰好可以成功执行查询。
moneyDao.decrease(accountA, amount) //挂起函数
// 如果协程又继续在 IO-Thread-2 上执行,那么下列操作数据库的代码可能会引起死锁,因为它需要等到 IO-Thread-1 的线程执行结束后才可以继续。
moneyDao.increase(accountB, amount) //挂起函数
database.setTransactionSuccessful() //永远不会执行这一行
} finally {
database.endTransaction() //永远不会执行这一行
}
}
}
Android 的 SQLite 事务受制于单个线程
在协程中使用数据库事务操作可能会引起死锁
简单实现
suspend fun <T> RoomDatabase.runInTransaction(
block: suspend () -> T
): T = withContext(newSingleThreadContext("DB")) {
beginTransaction()
try {
val result = block.invoke(this)
setTransactionSuccessful()
return@runBlocking result
} finally {
endTransaction()
}
}
// 一个很简单的退税函数
suspend fun sendTaxRefund(federalAccount: String, taypayerList: List<Taxpayer>) {
database.runInTransaction {
val refundJobs = taypayerList.map { taxpayer ->
coroutineScope {
// 并行去计算退税金额
async(Dispatchers.IO) {
val amount = irsTool.calculateRefund(taxpayer)
moneyDao.decrease(federalAccount, amount)
moneyDao.increase(taxpayer.account, amount)
}
}
}
// 等待所有计算任务结束
refundJobs.joinAll()
}
}
介绍 withTransaction
fun transferMoney(
accountA: String,
accountB: String,
amount: Int
) = GlobalScope.launch(Dispatchers.Main) {
roomDatabase.withTransaction {
moneyDao.decrease(accountA, amount)
moneyDao.increase(accountB, amount)
}
Toast.makeText(context, "Transfer Completed.", Toast.LENGTH_SHORT).show()
}
withTransaction API
https://developer.android.google.cn/reference/kotlin/androidx/room/package-summary.html#(androidx.room.RoomDatabase).withTransaction(kotlin.coroutines.SuspendFunction0)
withTransaction API 在上下文中创建了三个关键元素:
单线程调度器,用于执行数据库操作;
上下文元素,帮助 DAO 函数判断其是否处在事务中;
ThreadContextElement,用来标记事务协程中所使用的调度线程。
事务调度器
/**
*构建并返回一个 [ContinuationInterceptor] 用来将协程分发到获取到的线程中,并执行事务。[controlJob] 用来通过取消任务来控制线程的释放。
*/
private suspend fun Executor.acquireTransactionThread(
controlJob: Job
): ContinuationInterceptor = suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
// 当我们在等待获取到可用线程时,如果失败了或者任务取消,我们是不能够停止等待这一动作的,但我们可以取消 controlJob,这样一旦获取到控制权,很快就会被释放。
controlJob.cancel()
}
try {
execute {
// runBlocking 创建一个 event loop 来执行协程中的任务代码
runBlocking {
// 获取到线程后,通过返回有 runBlocking 创建的拦截器来恢复 suspendCancellableCoroutine,拦截器将会被用来拦截和分发代码块到获取的线程中
continuation.resume(coroutineContext[ContinuationInterceptor]!!)
// 挂起 runBlocking 协程,直到 controlJob 完成。由于协程是空的,所以这将会阻止 runBlocking 立即结束。
controlJob.join()
}
}
} catch (ex: RejectedExecutionException) {
// 无法获取线程,取消协程
continuation.cancel(
IllegalStateException(
"Unable to acquire a thread to perform the transaction.", ex)
)
}
}
suspendCancellableCoroutine
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/suspend-cancellable-coroutine.html
runBlocking
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
事务上下文元素
internal class TransactionElement(
private val transactionThreadControlJob: Job,
internal val transactionDispatcher: ContinuationInterceptor
) : CoroutineContext.Element {
// Singleton key 用于检索此上下文中的 element
companion object Key : CoroutineContext.Key<TransactionElement>
override val key: CoroutineContext.Key<TransactionElement>
get() = TransactionElement
/**
*这个 element 用来统计事务数量(包含嵌套事务)。调用 [acquire] 来增加计数,调用 [release] 来减少计数。如果在调用 [release] 时计数达到 0,则事务被取消,事务线程会被释放
*/
private val referenceCount = AtomicInteger(0)
fun acquire() {
referenceCount.incrementAndGet()
}
fun release() {
val count = referenceCount.decrementAndGet()
if (count < 0) {
throw IllegalStateException(
"Transaction was never started or was already released.")
} else if (count == 0) {
// 取消控制事务线程的 job 会导致它被 release
transactionThreadControlJob.cancel()
}
}
}
事务线程标记
private final ThreadLocal<Integer> mSuspendingTransactionId = new ThreadLocal<>();
public void assertNotSuspendingTransaction() {
if (!inTransaction() && mSuspendingTransactionId.get() != null) {
throw new IllegalStateException("Cannot access database on a different"
+ " coroutine context inherited from a suspending transaction.");
}
}
private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext {
val controlJob = Job()
val dispatcher = queryExecutor.acquireTransactionThread(controlJob)
val transactionElement = TransactionElement(controlJob, dispatcher)
val threadLocalElement =
suspendingTransactionId.asContextElement(controlJob.hashCode())
return dispatcher + transactionElement + threadLocalElement
}
事务 API 的实现
创建了事务上下文之后,我们终于可以提供一个安全的 API 用于在协程中执行数据库事务。接下来要做的就是将这个上下文和通常的 begin/end 事务模式结合起来:
suspend fun <R> RoomDatabase.withTransaction(
block: suspend () -> R
): R {
// 如果可以的话就使用继承的事务上下文,这样允许嵌套挂起的事务
val transactionContext =
coroutineContext[TransactionElement]?.transactionDispatcher
?: createTransactionContext()
return withContext(transactionContext) {
val transactionElement = coroutineContext[TransactionElement]!!
transactionElement.acquire()
try {
beginTransaction()
try {
// 在一个新的 scope 中封装 suspend 代码块,来等待子协程
val result = coroutineScope {
block.invoke(this)
}
setTransactionSuccessful()
return@withContext result
} finally {
endTransaction()
}
} finally {
transactionElement.release()
}
}
}
推荐阅读