Merge tag 'wq-for-6.19' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq

Pull workqueue updates from Tejun Heo:

 - Rescuer affinity management: Affinity is now updated only when
   detached using wq_unbound_cpumask consistently. DISASSOCIATED workers
   also follow unbound cpumask changes to avoid breaking CPU isolation

 - Rescuer cleanups preparing for fetching work items one by one from
   pool list: Work assignment factored out, optimized to skip pwqs no
   longer needing rescue, and shutdown logic simplified

 - Unused assert_rcu_or_wq_mutex_or_pool_mutex() removed

* tag 'wq-for-6.19' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq:
  workqueue: Don't rely on wq->rescuer to stop rescuer
  workqueue: Only assign rescuer work when really needed
  workqueue: Factor out assign_rescuer_work()
  workqueue: Init rescuer's affinities as wq_unbound_cpumask
  workqueue: Let DISASSOCIATED workers follow unbound wq cpumask changes
  workqueue: Update the rescuer's affinity only when it is detached
  workqueue: Remove unused assert_rcu_or_wq_mutex_or_pool_mutex
This commit is contained in:
Linus Torvalds
2025-12-03 12:50:10 -08:00

View File

@@ -541,12 +541,6 @@ static void show_one_worker_pool(struct worker_pool *pool);
!lockdep_is_held(&wq_pool_mutex), \
"RCU or wq_pool_mutex should be held")
#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
RCU_LOCKDEP_WARN(!rcu_read_lock_any_held() && \
!lockdep_is_held(&wq->mutex) && \
!lockdep_is_held(&wq_pool_mutex), \
"RCU, wq->mutex or wq_pool_mutex should be held")
#define for_each_bh_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(bh_worker_pools, cpu)[0]; \
(pool) < &per_cpu(bh_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -3443,6 +3437,27 @@ sleep:
goto woke_up;
}
static bool assign_rescuer_work(struct pool_workqueue *pwq, struct worker *rescuer)
{
struct worker_pool *pool = pwq->pool;
struct work_struct *work, *n;
/* need rescue? */
if (!pwq->nr_active || !need_to_create_worker(pool))
return false;
/*
* Slurp in all works issued via this workqueue and
* process'em.
*/
list_for_each_entry_safe(work, n, &pool->worklist, entry) {
if (get_work_pwq(work) == pwq && assign_work(work, rescuer, &n))
pwq->stats[PWQ_STAT_RESCUED]++;
}
return !list_empty(&rescuer->scheduled);
}
/**
* rescuer_thread - the rescuer thread function
* @__rescuer: self
@@ -3497,7 +3512,6 @@ repeat:
struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
struct pool_workqueue, mayday_node);
struct worker_pool *pool = pwq->pool;
struct work_struct *work, *n;
__set_current_state(TASK_RUNNING);
list_del_init(&pwq->mayday_node);
@@ -3508,18 +3522,9 @@ repeat:
raw_spin_lock_irq(&pool->lock);
/*
* Slurp in all works issued via this workqueue and
* process'em.
*/
WARN_ON_ONCE(!list_empty(&rescuer->scheduled));
list_for_each_entry_safe(work, n, &pool->worklist, entry) {
if (get_work_pwq(work) == pwq &&
assign_work(work, rescuer, &n))
pwq->stats[PWQ_STAT_RESCUED]++;
}
if (!list_empty(&rescuer->scheduled)) {
if (assign_rescuer_work(pwq, rescuer)) {
process_scheduled_works(rescuer);
/*
@@ -3534,10 +3539,9 @@ repeat:
if (pwq->nr_active && need_to_create_worker(pool)) {
raw_spin_lock(&wq_mayday_lock);
/*
* Queue iff we aren't racing destruction
* and somebody else hasn't queued it already.
* Queue iff somebody else hasn't queued it already.
*/
if (wq->rescuer && list_empty(&pwq->mayday_node)) {
if (list_empty(&pwq->mayday_node)) {
get_pwq(pwq);
list_add_tail(&pwq->mayday_node, &wq->maydays);
}
@@ -5376,11 +5380,6 @@ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
/* update node_nr_active->max */
wq_update_node_max_active(ctx->wq, -1);
/* rescuer needs to respect wq cpumask changes */
if (ctx->wq->rescuer)
set_cpus_allowed_ptr(ctx->wq->rescuer->task,
unbound_effective_cpumask(ctx->wq));
mutex_unlock(&ctx->wq->mutex);
}
@@ -5614,10 +5613,13 @@ static int init_rescuer(struct workqueue_struct *wq)
}
wq->rescuer = rescuer;
if (wq->flags & WQ_UNBOUND)
kthread_bind_mask(rescuer->task, unbound_effective_cpumask(wq));
/* initial cpumask is consistent with the detached rescuer and unbind_worker() */
if (cpumask_intersects(wq_unbound_cpumask, cpu_active_mask))
kthread_bind_mask(rescuer->task, wq_unbound_cpumask);
else
kthread_bind_mask(rescuer->task, cpu_possible_mask);
wake_up_process(rescuer->task);
return 0;
@@ -5902,16 +5904,10 @@ void destroy_workqueue(struct workqueue_struct *wq)
/* kill rescuer, if sanity checks fail, leave it w/o rescuer */
if (wq->rescuer) {
struct worker *rescuer = wq->rescuer;
/* this prevents new queueing */
raw_spin_lock_irq(&wq_mayday_lock);
wq->rescuer = NULL;
raw_spin_unlock_irq(&wq_mayday_lock);
/* rescuer will empty maydays list before exiting */
kthread_stop(rescuer->task);
kfree(rescuer);
kthread_stop(wq->rescuer->task);
kfree(wq->rescuer);
wq->rescuer = NULL;
}
/*
@@ -6937,8 +6933,26 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
}
if (!ret) {
int cpu;
struct worker_pool *pool;
struct worker *worker;
mutex_lock(&wq_pool_attach_mutex);
cpumask_copy(wq_unbound_cpumask, unbound_cpumask);
/* rescuer needs to respect cpumask changes when it is not attached */
list_for_each_entry(wq, &workqueues, list) {
if (wq->rescuer && !wq->rescuer->pool)
unbind_worker(wq->rescuer);
}
/* DISASSOCIATED worker needs to respect wq_unbound_cpumask */
for_each_possible_cpu(cpu) {
for_each_cpu_worker_pool(pool, cpu) {
if (!(pool->flags & POOL_DISASSOCIATED))
continue;
for_each_pool_worker(worker, pool)
unbind_worker(worker);
}
}
mutex_unlock(&wq_pool_attach_mutex);
}
return ret;