前言
WorkManager 的特点与适用场景
保证任务一定会被执行
WorkManager 有自己的数据库,每一个任务的信息与任务状态,都会保存在本地数据库中。所以即使程序没有在运行,或者在设备重启等情况下,WorkManager 依然可以保证任务的执行,只是不保证任务立即被执行。
WorkManager 的使用
class UploadWorker(appContext: Context, workerParams: WorkerParameters)
: Worker(appContext, workerParams) {
override fun doWork(): Result {
// Do the work here--in this case, upload the images.
uploadImages()
// Indicate whether the task finished successfully with the Result
return Result.success()
}
}
3.2 配置运行任务方式和时间
val uploadWorkRequest = OneTimeWorkRequestBuilder<UploadWorker>().build()
//执行多次任务,每隔12个小时执行一次
val uploadWorkRequest = PeriodicWorkRequestBuilder<UploadWorker>(12, TimeUnit.HOURS)
.build()
// Create a Constraints object that defines when the task should run
val constraints = Constraints.Builder()
.setRequiresDeviceIdle(true)
.setRequiresCharging(true)
.build()
// ...then create a OneTimeWorkRequest that uses those constraints
val compressionWork = OneTimeWorkRequestBuilder<CompressWorker>()
.setConstraints(constraints)
.build()
val uploadWorkRequest = OneTimeWorkRequestBuilder<UploadWorker>()
.setInitialDelay(10, TimeUnit.SECONDS)//符合触发条件后,延迟10秒执行
.build()
3.3 将任务提交给系统处理
WorkManager.getInstance(myContext).enqueue(uploadWorkRequest)
WorkManager.getInstance()
// First, run all the A tasks (in parallel):
.beginWith(workA1, workA2, workA3)
// ...when all A tasks are finished, run the single B task:
.then(workB)
// ...then run the C tasks (in any order):
.then(workC1, workC2)
.enqueue()
3.4 观察 Worker 的进度或状态
WorkManager.getInstance(myContext).getWorkInfoByIdLiveData(uploadWorkRequest.id)
.observe(lifecycleOwner, Observer { workInfo ->
})
WorkManager 流程分析与源码解析
val work1Request = OneTimeWorkRequestBuilder<Worker1>().build()
WorkManager.getInstance(this).enqueue(work1Request)
class Worker1(appContext: Context, workerParams: WorkerParameters) :
Worker(appContext, workerParams) {
override fun doWork(): Result {
Thread.sleep(5000)
return Result.success()
}
}
//AndroidManifest.xml
<provider
android:name="androidx.work.impl.WorkManagerInitializer"
android:exported="false"
android:multiprocess="true"
android:authorities="com.jandroid.multivideo.workmanager-init"
android:directBootAware="false" />
WorkManagerInitializer 的 onCreate() 方法:
//WorkManagerInitializer
public boolean onCreate() {
// Initialize WorkManager with the default configuration.
WorkManager.initialize(getContext(), new Configuration.Builder().build());
return true;
}
由于 WorkManager 是个单例,在此时 WorkManager 就已经被初始化了。在 initialize() 之前,会创建一个默认的 Configuration。Configuration 设置了许多属性,用来管理和调度工作的方式。通常我们使用 WorkManager 默认创建的 Configuration 即可。如需使用自己的 Configuration,可参考官方文档,有明确的使用说明。我们继续看 initialize() 的实现,由于 WorkManager 是个抽象类,真正的构造方法是在他的子类 WorkManagerImpl 实现的:
//WorkManagerImpl
(RestrictTo.Scope.LIBRARY_GROUP)
public static void initialize( Context context, Configuration configuration) {
synchronized (sLock) {
if (sDelegatedInstance != null && sDefaultInstance != null) {
throw new IllegalStateException("WorkManager is already initialized. Did you "
+ "try to initialize it manually without disabling "
+ "WorkManagerInitializer? See "
+ "WorkManager#initialize(Context, Configuration) or the class level "
+ "Javadoc for more information.");
}
if (sDelegatedInstance == null) {
context = context.getApplicationContext();
if (sDefaultInstance == null) {
sDefaultInstance = new WorkManagerImpl(
context,
configuration,
new WorkManagerTaskExecutor(configuration.getTaskExecutor()));
}
sDelegatedInstance = sDefaultInstance;
}
}
}
此时 sDelegatedInstance 为 null,WorkManager 会先创建一个默认的 WorkManagerTaskExecutor 对象,用来执行 WorkManager 的任务。之后创建一个 WorkManagerImpl 对象:
//WorkManagerImpl
public WorkManagerImpl(
Context context,
Configuration configuration,
TaskExecutor workTaskExecutor) {
this(context,
configuration,
workTaskExecutor,
context.getResources().getBoolean(R.bool.workmanager_test_configuration));
}
//WorkManagerImpl
public WorkManagerImpl(
Context context,
Configuration configuration,
TaskExecutor workTaskExecutor,
boolean useTestDatabase) {
this(context,
configuration,
workTaskExecutor,
WorkDatabase.create(
context.getApplicationContext(),
workTaskExecutor.getBackgroundExecutor(),
useTestDatabase)
);
}
WorkManager 在此时创建了数据库。WorkDatabase.create() 将任务列表序列化到本地,记录每一个任务的属性,执行条件,执行顺序及执行状态等。从而保证任务在冷启动或硬件重启后,可以根据条件继续执行。接着看 this() 的实现:
//WorkManagerImpl
(RestrictTo.Scope.LIBRARY_GROUP)
public WorkManagerImpl(
Context context,
Configuration configuration,
TaskExecutor workTaskExecutor,
WorkDatabase database) {
Context applicationContext = context.getApplicationContext();
Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));
List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);
Processor processor = new Processor(
context,
configuration,
workTaskExecutor,
database,
schedulers);
internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);
}
到这里有三个重要的初始化步骤。分别是 createSchedulers() 来根据 Build Version 创建不同的 Schedulers 进行任务调度,Processor() 用来管理 Schedulers 的执行,和 internalInit() 真正的初始化。先看 createSchedulers() 的实现:
//WorkManagerImpl
public List<Scheduler> createSchedulers(
Context context,
TaskExecutor taskExecutor) {
return Arrays.asList(
Schedulers.createBestAvailableBackgroundScheduler(context, this),
// Specify the task executor directly here as this happens before internalInit.
// GreedyScheduler creates ConstraintTrackers and controllers eagerly.
new GreedyScheduler(context, taskExecutor, this));
}
return 一个 Scheduler 数组。其中 GreedyScheduler() 是常驻的,用来执行没有任何约束的非周期性的任务。接下来看 createBestAvailableBackgroundScheduler() 的实现。
//Scheduler
static Scheduler createBestAvailableBackgroundScheduler(
Context context,
WorkManagerImpl workManager) {
Scheduler scheduler;
if (Build.VERSION.SDK_INT >= WorkManagerImpl.MIN_JOB_SCHEDULER_API_LEVEL) {
scheduler = new SystemJobScheduler(context, workManager);
setComponentEnabled(context, SystemJobService.class, true);
Logger.get().debug(TAG, "Created SystemJobScheduler and enabled SystemJobService");
} else {
scheduler = tryCreateGcmBasedScheduler(context);
if (scheduler == null) {
scheduler = new SystemAlarmScheduler(context);
setComponentEnabled(context, SystemAlarmService.class, true);
Logger.get().debug(TAG, "Created SystemAlarmScheduler");
}
}
return scheduler;
}
这段代码对 build version 进行了判断。若 >=23,则返回 SystemJobScheduler(),即利用 JobScheduler 进行任务管理。<23 的时候先尝试使用 GcmScheduler 进行管理。若无法创建 GcmScheduler 则返回 SystemAlarmScheduler() 使用 AlamManager 进行任务管理。返回的这个 Scheduler 是用来执行周期性,或者有约束性的任务。由此可见,WorkManager 创建了两个 Scheduler,分别为执行非约束非周期性任务的 GreedyScheduler,和执行约束性周期性任务的 SystemJobScheduler/GcmBasedScheduler/SystemAlarmScheduler。
这几种 Scheduler 的构造和执行之后再分析。
之后初始化 Processor。Processor 存储了 Configuration,TaskExecutor,WorkDatabase,schedulers 等,用来在适当的时机进行任务调度。再来看 internalInit():
//WorkManagerImpl
private void internalInit( Context context,
Configuration configuration,
TaskExecutor workTaskExecutor,
WorkDatabase workDatabase,
List<Scheduler> schedulers,
Processor processor) {
context = context.getApplicationContext();
mContext = context;
mConfiguration = configuration;
mWorkTaskExecutor = workTaskExecutor;
mWorkDatabase = workDatabase;
mSchedulers = schedulers;
mProcessor = processor;
mPreferenceUtils = new PreferenceUtils(workDatabase);
mForceStopRunnableCompleted = false;
// Checks for app force stops.
mWorkTaskExecutor.executeOnBackgroundThread(new ForceStopRunnable(context, this));
}
记录了 Configuration,TaskExecutor,WorkDatabase,schedulers,Processor 等。然后我们看最后一行执行语句,启动了一个 ForceStopRunnable,这个 Runnable 是干什么用的呢?直接看 run() 的实现:
//ForceStopRunnable
@Override
public void run() {
// Migrate the database to the no-backup directory if necessary.
WorkDatabasePathHelper.migrateDatabase(mContext);
// Clean invalid jobs attributed to WorkManager, and Workers that might have been
// interrupted because the application crashed (RUNNING state).
Logger.get().debug(TAG, "Performing cleanup operations.");
try {
boolean needsScheduling = cleanUp();
if (shouldRescheduleWorkers()) {
Logger.get().debug(TAG, "Rescheduling Workers.");
mWorkManager.rescheduleEligibleWork();
// Mark the jobs as migrated.
mWorkManager.getPreferenceUtils().setNeedsReschedule(false);
} else if (isForceStopped()) {
Logger.get().debug(TAG, "Application was force-stopped, rescheduling.");
mWorkManager.rescheduleEligibleWork();
} else if (needsScheduling) {
Logger.get().debug(TAG, "Found unfinished work, scheduling it.");
Schedulers.schedule(
mWorkManager.getConfiguration(),
mWorkManager.getWorkDatabase(),
mWorkManager.getSchedulers());
}
mWorkManager.onForceStopRunnableCompleted();
} catch (SQLiteCantOpenDatabaseException
| SQLiteDatabaseCorruptException
| SQLiteAccessPermException exception) {
// ForceStopRunnable is usually the first thing that accesses a database (or an app's
// internal data directory). This means that weird PackageManager bugs are attributed
// to ForceStopRunnable, which is unfortunate. This gives the developer a better error
// message.
String message =
"The file system on the device is in a bad state. WorkManager cannot access "
+ "the app's internal data store.";
Logger.get().error(TAG, message, exception);
throw new IllegalStateException(message, exception);
}
}
这段代码的实现细节先不做深究。但是很明显,这个 Runnable 的作用就是在 WorkManager 初始化过程中,发现了未完成的,需要重新执行的任务,或者 app 被强制 kill 的情况下,直接对 Scheduler 进行调度。到此,一个 WorkManager 的初始化流程就完成了。
总结:
Schedulers 有两个。
4.1.2. WorkRequest 的创建
val work1Request = OneTimeWorkRequestBuilder<Worker1>().build()
//OneTimeWorkRequest.Builder
/**
* Creates a {@link OneTimeWorkRequest}.
*
* @param workerClass The {@link ListenableWorker} class to run for this work
*/
public Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
super(workerClass);
mWorkSpec.inputMergerClassName = OverwritingInputMerger.class.getName();
}
//WorkRequest.Builder
Builder(@NonNull Class<? extends ListenableWorker> workerClass) {
mId = UUID.randomUUID();
mWorkerClass = workerClass;
mWorkSpec = new WorkSpec(mId.toString(), workerClass.getName());
addTag(workerClass.getName());
}
//WorkRequest.Builder
public final W build() {
W returnValue = buildInternal();
// Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.
mId = UUID.randomUUID();
mWorkSpec = new WorkSpec(mWorkSpec);
mWorkSpec.id = mId.toString();
return returnValue;
}
//OneTimeWorkRequest.Builder
OneTimeWorkRequest buildInternal() {
if (mBackoffCriteriaSet
&& Build.VERSION.SDK_INT >= 23
&& mWorkSpec.constraints.requiresDeviceIdle()) {
throw new IllegalArgumentException(
"Cannot set backoff criteria on an idle mode job");
}
if (mWorkSpec.runInForeground
&& Build.VERSION.SDK_INT >= 23
&& mWorkSpec.constraints.requiresDeviceIdle()) {
throw new IllegalArgumentException(
"Cannot run in foreground with an idle mode constraint");
}
return new OneTimeWorkRequest(this);
}
//OneTimeWorkRequest
OneTimeWorkRequest(Builder builder) {
super(builder.mId, builder.mWorkSpec, builder.mTags);
}
//OneTimeWorkRequest.Builder
public final W build() {
W returnValue = buildInternal();
// Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.
mId = UUID.randomUUID();
mWorkSpec = new WorkSpec(mWorkSpec);
mWorkSpec.id = mId.toString();
return returnValue;
}
总结:
WorkRequest 创建的流程图
4.2 非约束条件任务的执行过程
执行 OneTimeWorkRequest
WorkManager.getInstance(this).enqueue(work1Request)
//WorkManager
public final Operation enqueue( WorkRequest workRequest) {
return enqueue(Collections.singletonList(workRequest));
}
//WorkManager
public abstract Operation enqueue( List<? extends WorkRequest> requests);
//WorkManagerImpl
public Operation enqueue(
extends WorkRequest> workRequests) { List<?
// This error is not being propagated as part of the Operation, as we want the
// app to crash during development. Having no workRequests is always a developer error.
if (workRequests.isEmpty()) {
throw new IllegalArgumentException(
"enqueue needs at least one WorkRequest.");
}
return new WorkContinuationImpl(this, workRequests).enqueue();
}
创建一个 WorkContinuationImpl() 对象,再执行 enqueue() 方法。WorkContinuationImpl 是 WorkContinuation 的子类。用来把多个 OneTimeWorkRequest 根据需求串行,并行或合并处理。我们熟悉的 then(),combine(),enqueue() 等都是这个类的方法。
//WorkContinuationImpl
WorkContinuationImpl( WorkManagerImpl workManagerImpl,
String name,
ExistingWorkPolicy existingWorkPolicy,
extends WorkRequest> work, List<?
List<WorkContinuationImpl> parents) {
mWorkManagerImpl = workManagerImpl;
mName = name;
mExistingWorkPolicy = existingWorkPolicy;
mWork = work;
mParents = parents;
mIds = new ArrayList<>(mWork.size());
mAllIds = new ArrayList<>();
if (parents != null) {
for (WorkContinuationImpl parent : parents) {
mAllIds.addAll(parent.mAllIds);
}
}
for (int i = 0; i < work.size(); i++) {
String id = work.get(i).getStringId();
mIds.add(id);
mAllIds.add(id);
}
}
//WorkContinuationImpl
public Operation enqueue() {
// Only enqueue if not already enqueued.
if (!mEnqueued) {
// The runnable walks the hierarchy of the continuations
// and marks them enqueued using the markEnqueued() method, parent first.
EnqueueRunnable runnable = new EnqueueRunnable(this);
mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);
mOperation = runnable.getOperation();
} else {
Logger.get().warning(TAG,
String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));
}
return mOperation;
}
//EnqueueRunnable
public void run() {
try {
if (mWorkContinuation.hasCycles()) {
throw new IllegalStateException(
String.format("WorkContinuation has cycles (%s)", mWorkContinuation));
}
boolean needsScheduling = addToDatabase();
if (needsScheduling) {
// Enable RescheduleReceiver, only when there are Worker's that need scheduling.
final Context context =
mWorkContinuation.getWorkManagerImpl().getApplicationContext();
PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);
scheduleWorkInBackground();
}
mOperation.setState(Operation.SUCCESS);
} catch (Throwable exception) {
mOperation.setState(new Operation.State.FAILURE(exception));
}
}
//AndroidManifest
<receiver android:directBootAware="false" android:enabled="false" android:exported="false" android:name="androidx.work.impl.background.systemalarm.RescheduleReceiver">
<intent-filter>
<action android:name="android.intent.action.BOOT_COMPLETED"/>
<action android:name="android.intent.action.TIME_SET"/>
<action android:name="android.intent.action.TIMEZONE_CHANGED"/>
</intent-filter>
</receiver>
//EnqueueRunnable
/**
* Schedules work on the background scheduler.
*/
@VisibleForTesting
public void scheduleWorkInBackground() {
WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();
Schedulers.schedule(
workManager.getConfiguration(),
workManager.getWorkDatabase(),
workManager.getSchedulers());
}
//Schedulers
public static void schedule(
@NonNull Configuration configuration,
@NonNull WorkDatabase workDatabase,
List<Scheduler> schedulers) {
if (schedulers == null || schedulers.size() == 0) {
return;
}
WorkSpecDao workSpecDao = workDatabase.workSpecDao();
List<WorkSpec> eligibleWorkSpecs;
workDatabase.beginTransaction();
try {
eligibleWorkSpecs = workSpecDao.getEligibleWorkForScheduling(
configuration.getMaxSchedulerLimit());
if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
long now = System.currentTimeMillis();
// Mark all the WorkSpecs as scheduled.
// Calls to Scheduler#schedule() could potentially result in more schedules
// on a separate thread. Therefore, this needs to be done first.
for (WorkSpec workSpec : eligibleWorkSpecs) {
workSpecDao.markWorkSpecScheduled(workSpec.id, now);
}
}
workDatabase.setTransactionSuccessful();
} finally {
workDatabase.endTransaction();
}
if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {
WorkSpec[] eligibleWorkSpecsArray = eligibleWorkSpecs.toArray(new WorkSpec[0]);
// Delegate to the underlying scheduler.
for (Scheduler scheduler : schedulers) {
scheduler.schedule(eligibleWorkSpecsArray);
}
}
}
//GreedyScheduler
@Override
public void schedule(@NonNull WorkSpec... workSpecs) {
if (mIsMainProcess == null) {
// The default process name is the package name.
mIsMainProcess = TextUtils.equals(mContext.getPackageName(), getProcessName());
}
if (!mIsMainProcess) {
Logger.get().info(TAG, "Ignoring schedule request in non-main process");
return;
}
registerExecutionListenerIfNeeded();
// Keep track of the list of new WorkSpecs whose constraints need to be tracked.
// Add them to the known list of constrained WorkSpecs and call replace() on
// WorkConstraintsTracker. That way we only need to synchronize on the part where we
// are updating mConstrainedWorkSpecs.
List<WorkSpec> constrainedWorkSpecs = new ArrayList<>();
List<String> constrainedWorkSpecIds = new ArrayList<>();
for (WorkSpec workSpec : workSpecs) {
if (workSpec.state == WorkInfo.State.ENQUEUED
&& !workSpec.isPeriodic()
&& workSpec.initialDelay == 0L
&& !workSpec.isBackedOff()) {
if (workSpec.hasConstraints()) {
if (SDK_INT >= 23 && workSpec.constraints.requiresDeviceIdle()) {
// Ignore requests that have an idle mode constraint.
Logger.get().debug(TAG,
String.format("Ignoring WorkSpec %s, Requires device idle.",
workSpec));
} else if (SDK_INT >= 24 && workSpec.constraints.hasContentUriTriggers()) {
// Ignore requests that have content uri triggers.
Logger.get().debug(TAG,
String.format("Ignoring WorkSpec %s, Requires ContentUri triggers.",
workSpec));
} else {
constrainedWorkSpecs.add(workSpec);
constrainedWorkSpecIds.add(workSpec.id);
}
} else {
Logger.get().debug(TAG, String.format("Starting work for %s", workSpec.id));
mWorkManagerImpl.startWork(workSpec.id);
}
}
}
// onExecuted() which is called on the main thread also modifies the list of mConstrained
// WorkSpecs. Therefore we need to lock here.
synchronized (mLock) {
if (!constrainedWorkSpecs.isEmpty()) {
Logger.get().debug(TAG, String.format("Starting tracking for [%s]",
TextUtils.join(",", constrainedWorkSpecIds)));
mConstrainedWorkSpecs.addAll(constrainedWorkSpecs);
mWorkConstraintsTracker.replace(mConstrainedWorkSpecs);
}
}
}
//GreedyScheduler
mWorkManagerImpl.startWork(workSpec.id);
//WorkManagerImpl
/**
* @param workSpecId The {@link WorkSpec} id to start
* @hide
*/
(RestrictTo.Scope.LIBRARY_GROUP)
public void startWork(@NonNull String workSpecId) {
startWork(workSpecId, null);
}
//WorkManagerImpl
/**
* @param workSpecId The {@link WorkSpec} id to start
* @param runtimeExtras The {@link WorkerParameters.RuntimeExtras} associated with this work
* @hide
*/
public void startWork(
String workSpecId,
WorkerParameters.RuntimeExtras runtimeExtras) {
mWorkTaskExecutor
.executeOnBackgroundThread(
new StartWorkRunnable(this, workSpecId, runtimeExtras));
}
//StartWorkRunnable
public void run() {
mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras);
}
//Processor
/**
* Starts a given unit of work in the background.
*
* @param id The work id to execute.
* @param runtimeExtras The {@link WorkerParameters.RuntimeExtras} for this work, if any.
* @return {@code true} if the work was successfully enqueued for processing
*/
public boolean startWork(
String id,
WorkerParameters.RuntimeExtras runtimeExtras) {
WorkerWrapper workWrapper;
synchronized (mLock) {
// Work may get triggered multiple times if they have passing constraints
// and new work with those constraints are added.
if (mEnqueuedWorkMap.containsKey(id)) {
Logger.get().debug(
TAG,
String.format("Work %s is already enqueued for processing", id));
return false;
}
workWrapper =
new WorkerWrapper.Builder(
mAppContext,
mConfiguration,
mWorkTaskExecutor,
this,
mWorkDatabase,
id)
.withSchedulers(mSchedulers)
.withRuntimeExtras(runtimeExtras)
.build();
ListenableFuture<Boolean> future = workWrapper.getFuture();
future.addListener(
new FutureListener(this, id, future),
mWorkTaskExecutor.getMainThreadExecutor());
mEnqueuedWorkMap.put(id, workWrapper);
}
mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);
Logger.get().debug(TAG, String.format("%s: processing %s", getClass().getSimpleName(), id));
return true;
}
//WorkerWrapper
public void run() {
mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);
mWorkDescription = createWorkDescription(mTags);
runWorker();
}
//WorkerWrapper
private void runWorker() {
if (tryCheckForInterruptionAndResolve()) {
return;
}
mWorkDatabase.beginTransaction();
try {
mWorkSpec = mWorkSpecDao.getWorkSpec(mWorkSpecId);
...
mWorkDatabase.setTransactionSuccessful();
} finally {
mWorkDatabase.endTransaction();
}
// Merge inputs. This can be potentially expensive code, so this should not be done inside
// a database transaction.
...
WorkerParameters params = new WorkerParameters(
UUID.fromString(mWorkSpecId),
input,
mTags,
mRuntimeExtras,
mWorkSpec.runAttemptCount,
mConfiguration.getExecutor(),
mWorkTaskExecutor,
mConfiguration.getWorkerFactory(),
new WorkProgressUpdater(mWorkDatabase, mWorkTaskExecutor),
new WorkForegroundUpdater(mForegroundProcessor, mWorkTaskExecutor));
// Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override
// in test mode.
if (mWorker == null) {
mWorker = mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback(
mAppContext,
mWorkSpec.workerClassName,
params);
}
...
// Try to set the work to the running state. Note that this may fail because another thread
// may have modified the DB since we checked last at the top of this function.
if (trySetRunning()) {
if (tryCheckForInterruptionAndResolve()) {
return;
}
final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();
// Call mWorker.startWork() on the main thread.
mWorkTaskExecutor.getMainThreadExecutor()
.execute(new Runnable() {
@Override
public void run() {
try {
Logger.get().debug(TAG, String.format("Starting work for %s",
mWorkSpec.workerClassName));
mInnerFuture = mWorker.startWork();
future.setFuture(mInnerFuture);
} catch (Throwable e) {
future.setException(e);
}
}
});
// Avoid synthetic accessors.
...
}
//ListenableWorker
public abstract ListenableFuture<Result> startWork();
//Worker
public final ListenableFuture<Result> startWork() {
mFuture = SettableFuture.create();
getBackgroundExecutor().execute(new Runnable() {
public void run() {
try {
Result result = doWork();
mFuture.set(result);
} catch (Throwable throwable) {
mFuture.setException(throwable);
}
}
});
return mFuture;
}
在 run() 的实现执行了 doWork() 方法,即执行了我们 Worker1 的 doWork() 方法。
//Worker1
class Worker1(appContext: Context, workerParams: WorkerParameters) :
Worker(appContext, workerParams) {
override fun doWork(): Result {
Thread.sleep(5000)
return Result.success()
}
}
在执行完这个任务后,返回了 success。Worker 也就执行完成了。回到 WorkerWrapper 的 runWorker() 方法看接下来的处理:
//WorkerWrapper
private void runWorker() {
...
final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();
// Call mWorker.startWork() on the main thread.
mWorkTaskExecutor.getMainThreadExecutor()
.execute(new Runnable() {
@Override
public void run() {
try {
Logger.get().debug(TAG, String.format("Starting work for %s",
mWorkSpec.workerClassName));
mInnerFuture = mWorker.startWork();
future.setFuture(mInnerFuture);
} catch (Throwable e) {
future.setException(e);
}
}
});
// Avoid synthetic accessors.
final String workDescription = mWorkDescription;
future.addListener(new Runnable() {
@Override
@SuppressLint("SyntheticAccessor")
public void run() {
try {
// If the ListenableWorker returns a null result treat it as a failure.
ListenableWorker.Result result = future.get();
if (result == null) {
Logger.get().error(TAG, String.format(
"%s returned a null result. Treating it as a failure.",
mWorkSpec.workerClassName));
} else {
Logger.get().debug(TAG, String.format("%s returned a %s result.",
mWorkSpec.workerClassName, result));
mResult = result;
}
} catch (CancellationException exception) {
// Cancellations need to be treated with care here because innerFuture
// cancellations will bubble up, and we need to gracefully handle that.
Logger.get().info(TAG, String.format("%s was cancelled", workDescription),
exception);
} catch (InterruptedException | ExecutionException exception) {
Logger.get().error(TAG,
String.format("%s failed because it threw an exception/error",
workDescription), exception);
} finally {
onWorkFinished();
}
}
}, mWorkTaskExecutor.getBackgroundExecutor());
} else {
resolveIncorrectStatus();
}
}
//WorkerWrapper
void onWorkFinished() {
boolean isWorkFinished = false;
if (!tryCheckForInterruptionAndResolve()) {
mWorkDatabase.beginTransaction();
try {
WorkInfo.State state = mWorkSpecDao.getState(mWorkSpecId);
mWorkDatabase.workProgressDao().delete(mWorkSpecId);
if (state == null) {
// state can be null here with a REPLACE on beginUniqueWork().
// Treat it as a failure, and rescheduleAndResolve() will
// turn into a no-op. We still need to notify potential observers
// holding on to wake locks on our behalf.
resolve(false);
isWorkFinished = true;
} else if (state == RUNNING) {
handleResult(mResult);
// Update state after a call to handleResult()
state = mWorkSpecDao.getState(mWorkSpecId);
isWorkFinished = state.isFinished();
} else if (!state.isFinished()) {
rescheduleAndResolve();
}
mWorkDatabase.setTransactionSuccessful();
} finally {
mWorkDatabase.endTransaction();
}
}
// Try to schedule any newly-unblocked workers, and workers requiring rescheduling (such as
// periodic work using AlarmManager). This code runs after runWorker() because it should
// happen in its own transaction.
// Cancel this work in other schedulers. For example, if this work was
// completed by GreedyScheduler, we should make sure JobScheduler is informed
// that it should remove this job and AlarmManager should remove all related alarms.
if (mSchedulers != null) {
if (isWorkFinished) {
for (Scheduler scheduler : mSchedulers) {
scheduler.cancel(mWorkSpecId);
}
}
Schedulers.schedule(mConfiguration, mWorkDatabase, mSchedulers);
}
}
//WorkerWrapper
private void handleResult(ListenableWorker.Result result) {
if (result instanceof ListenableWorker.Result.Success) {
Logger.get().info(
TAG,
String.format("Worker result SUCCESS for %s", mWorkDescription));
if (mWorkSpec.isPeriodic()) {
resetPeriodicAndResolve();
} else {
setSucceededAndResolve();
}
} else if (result instanceof ListenableWorker.Result.Retry) {
Logger.get().info(
TAG,
String.format("Worker result RETRY for %s", mWorkDescription));
rescheduleAndResolve();
} else {
Logger.get().info(
TAG,
String.format("Worker result FAILURE for %s", mWorkDescription));
if (mWorkSpec.isPeriodic()) {
resetPeriodicAndResolve();
} else {
setFailedAndResolve();
}
}
}
总结:
在 WorkManager 执行了 enqueue() 后,创建 WorkContinuationImpl 对象执行 enqueue() 方法。
WorkContinuationImpl 持有的 EnqueueRunnable 对象将任务添加到 db,并交给 Schedulers 去调度。
Schedulers 将任务交给每一个 Scheduler 去处理。在我们的示例中,GreedyScheduler 会先处理这个任务。
GreedyScheduler 经过一系列判断后,调用 WorkManager 的 startWork() 方法执行这种一次性,非延迟,无约束的任务。
WorkManager 持有的 StartWorkRunnable 对象会将任务交给 Processor 去处理,执行 startWork() 方法。
Processor 创建一个 WorkerWrapper 对象,由它去调用 Worker 的 startWork() 方法,执行我们自定义 worker 的任务,并返回相应的 result。
任务完成后,WorkerWrapper 会根据 result 对任务状态,db 等进行更新,然后 schedule 下一个任务。
WorkManager 任务执行流程图:
4.3 带约束的任务的执行过程
val constraints = Constraints.Builder()
.setRequiresBatteryNotLow(true)
.build()
val work2Request = OneTimeWorkRequestBuilder<Worker2>()
.setConstraints(constraints)
.build()
WorkManager.getInstance(this).enqueue(work2Request)
public final B setConstraints( Constraints constraints) {
mWorkSpec.constraints = constraints;
return getThis();
}
//SystemJobService
@Override
public boolean onStartJob(@NonNull JobParameters params) {
... ...
String workSpecId = getWorkSpecIdFromJobParameters(params);
... ...
synchronized (mJobParameters) {
... ...
mJobParameters.put(workSpecId, params);
}
... ...
mWorkManagerImpl.startWork(workSpecId, runtimeExtras);
return true;
}
//SystemJobScheduler
public SystemJobScheduler( Context context, WorkManagerImpl workManager) {
this(context,
workManager,
(JobScheduler) context.getSystemService(JOB_SCHEDULER_SERVICE),
new SystemJobInfoConverter(context));
}
SystemJobScheduler 的 schedule() 方法执行了 scheduleInternal():
//SystemJobScheduler
public void scheduleInternal(WorkSpec workSpec, int jobId) {
JobInfo jobInfo = mSystemJobInfoConverter.convert(workSpec, jobId);
Logger.get().debug(
TAG,
String.format("Scheduling work ID %s Job ID %s", workSpec.id, jobId));
try {
mJobScheduler.schedule(jobInfo);
} catch (IllegalStateException e) {
... ...
throw new IllegalStateException(message, e);
} catch (Throwable throwable) {
// OEM implementation bugs in JobScheduler cause the app to crash. Avoid crashing.
Logger.get().error(TAG, String.format("Unable to schedule %s", workSpec), throwable);
}
}
SystemJobInfoConverter.convert() 方法就是创建了一个 JobInfo,并将 Constraints 里的约束条件赋予 JobInfo 对象,之后便执行了 JobScheduler.schedule(),根据约束条件对任务进行调度。
<receiver android:directBootAware="false" android:enabled="false" android:exported="false" android:name="androidx.work.impl.background.systemalarm.ConstraintProxy$BatteryNotLowProxy">
<intent-filter>
<action android:name="android.intent.action.BATTERY_OKAY"/>
<action android:name="android.intent.action.BATTERY_LOW"/>
</intent-filter>
</receiver>
//ConstraintProxy
public static class BatteryNotLowProxy extends ConstraintProxy {
}
public void onReceive(Context context, Intent intent) {
Logger.get().debug(TAG, String.format("onReceive : %s", intent));
Intent constraintChangedIntent = CommandHandler.createConstraintsChangedIntent(context);
context.startService(constraintChangedIntent);
}
//ConstraintProxy
static Intent createConstraintsChangedIntent(@NonNull Context context) {
Intent intent = new Intent(context, SystemAlarmService.class);
intent.setAction(ACTION_CONSTRAINTS_CHANGED);
return intent;
}
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
... ...
if (intent != null) {
mDispatcher.add(intent, startId);
}
// If the service were to crash, we want all unacknowledged Intents to get redelivered.
return Service.START_REDELIVER_INTENT;
}
//SystemAlarmDispatcher
public boolean add(@NonNull final Intent intent, final int startId) {
... ...
if (CommandHandler.ACTION_CONSTRAINTS_CHANGED.equals(action)
&& hasIntentWithAction(CommandHandler.ACTION_CONSTRAINTS_CHANGED)) {
return false;
}
intent.putExtra(KEY_START_ID, startId);
synchronized (mIntents) {
boolean hasCommands = !mIntents.isEmpty();
mIntents.add(intent);
if (!hasCommands) {
// Only call processCommand if this is the first command.
// The call to dequeueAndCheckForCompletion will process the remaining commands
// in the order that they were added.
processCommand();
}
}
return true;
}
//SystemAlarmDispatcher
mCommandHandler.onHandleIntent(mCurrentIntent, startId,
SystemAlarmDispatcher.this);
//CommandHandler
if (ACTION_CONSTRAINTS_CHANGED.equals(action)) {
handleConstraintsChanged(intent, startId, dispatcher);
}
//CommandHandler
private void handleConstraintsChanged(
@NonNull Intent intent, int startId,
@NonNull SystemAlarmDispatcher dispatcher) {
Logger.get().debug(TAG, String.format("Handling constraints changed %s", intent));
// Constraints changed command handler is synchronous. No cleanup
// is necessary.
ConstraintsCommandHandler changedCommandHandler =
new ConstraintsCommandHandler(mContext, startId, dispatcher);
changedCommandHandler.handleConstraintsChanged();
}
//ConstraintsCommandHandler
Intent intent = CommandHandler.createDelayMetIntent(mContext, workSpecId);
Logger.get().debug(TAG, String.format(
"Creating a delay_met command for workSpec with id (%s)", workSpecId));
mDispatcher.postOnMainThread(
new SystemAlarmDispatcher.AddRunnable(mDispatcher, intent, mStartId));
//CommandHandler
else if (ACTION_DELAY_MET.equals(action)) {
handleDelayMet(intent, startId, dispatcher);
}
public void onAllConstraintsMet( List<String> workSpecIds) {
... ...
synchronized (mLock) {
if (mCurrentState == STATE_INITIAL) {
... ...
boolean isEnqueued = mDispatcher.getProcessor().startWork(mWorkSpecId);
... ...
} else {
Logger.get().debug(TAG, String.format("Already started work for %s", mWorkSpecId));
}
}
}
结语
长按右侧二维码
查看更多开发者精彩分享
"开发者说·DTalk" 面向中国开发者们征集 Google 移动应用 (apps & games) 相关的产品/技术内容。欢迎大家前来分享您对移动应用的行业洞察或见解、移动开发过程中的心得或新发现、以及应用出海的实战经验总结和相关产品的使用反馈等。我们由衷地希望可以给这些出众的中国开发者们提供更好展现自己、充分发挥自己特长的平台。我们将通过大家的技术内容着重选出优秀案例进行谷歌开发技术专家 (GDE) 的推荐。
点击屏末 | 阅读原文 | 即刻报名参与 "开发者说·DTalk"