Blog
Blog copied to clipboard
Chapter 8 Bottom Halves and Deferring work
由于各种限制,中断处理程序只能构成任何中断处理流程的前半部分。 中断是异步发生的,为了避免打断其他代码执行路径(包括启动中断处理),中断处理程序需要尽可能执行得快。 中断处理默认会关闭当前中断,如果中断迟迟执行不完,会导致中断关闭时间过长,这对一些时间敏感的任务是很不友好的,比如网络,关中断太长可能导致丢包。
Bottom Halves
中断处理程序一般需要回复硬件收到了中断,也有可能需要从硬件拷贝数据。没有绝对的界限说哪些事情应该在上半部,哪些应该在下半部。一些区分上下半部的建议:
- If the work is time sensitive, perform it in the interrupt handler.
- If the work is related to the hardware, perform it in the interrupt handler
- If the work needs to ensure that another interrupt (particularly the same interrupt) does not interrupt it, perform it in the interrupt handler.
- For everything else, consider performing the work in the bottom half
为什么需要下半部
中断发生的时候,当前中断在所有CPU上是关闭的。更糟的是,有IRQF_DISABLED
的标志导致所有irq lines在当前CPU上关闭,并且当前irq line在所有CPU上是关闭的。因此,减少中断处理时间对系统性能尤为重要。
中断下半部defer work at any time,通常下半部在中断返回后立即运行。中断下半部运行时,所有中断已打开。
softirqs和tasklets
softirqs是静态创建的,必须在静态编译时注册。他们可以同时运行在任何CPU上,甚至两个同类型的softirqs可以同时运行。 tasklets是建立在softirqs之上动态创建的。两个不同的tasklets可以同时运行在不同的CPU上,相同类型的tasklets不能同时运行。 大多数情况下,推荐使用tasklets;当对性能比较敏感时,softirqs是很好的选择。
softirqs
softirq_action
代表了softirqs(linux/interrupt.h)
struct softirq_action
{
void (*action)(struct softirq_action *);
};
在kernel/softirq.c
中定义了32个softirq entry.
static struct softirq_action softirq_vec[NR_SOFTIRQS] ;
每注册一个softirq
就会用掉数组中的一个entry。
softirq handler
原型如下
void softirq_handler(struct softirq_action *)
softirq不会抢占另外一个softirq。能抢占softirq的只有中断。
运行softirq
softirq在运行之前必须先被marked,也就是所谓的raising the softirq
。通常由中断处理程序标记来marks softirq.被标记的softirq在以下几个地方被检查和执行:
- In the return from hardware interrupt code path
- In the ksoftirqd kernel thread
- In any code that explicitly checks for and executes pending softirqs, such as the networking subsystem
softirq执行流程是do_softirq()
调用__do_softirq()
。
u32 pending;
pending = local_softirq_pending(); //挂起软中断的 32 位掩码——如果设置了第 n 位,则第 n 个软中断正在挂起
if (pending) {
struct softirq_action *h;
/* reset the pending bitmask */
set_softirq_pending(0);
h = softirq_vec;
do {
if (pending & 1)
h->action(h);
h++;
pending >>= 1;
} while (pending);
}
流程比较简单,如果有pending softirq,__do_softirq
挨个遍历,并执行对应的handler。
- It sets the pending local variable to the value returned by the local_softirq_pending() macro.This is a 32-bit mask of pending softirqs—if bit n is set, the nth softirq is pending
- Now that the pending bitmask of softirqs is saved, it clears the actual bitmask
- The pointer h is set to the first entry in the softirq_vec
- If the first bit in pending is set, h->action(h) is called
- The pointer h is incremented by one so that it now points to the second entry in the softirq_vec array.
- The bitmask pending is right-shifted by one.This tosses the first bit away and moves all other bits one place to the right. Consequently, the second bit is now the first (and so on)
- The pointer h now points to the second entry in the array, and the pending bitmask now has the second bit as the first. Repeat the previous steps.
- Continue repeating until pending is zero, at which point there are no more pending softirqs and the work is done.
使用softirqs
softirq一般是用在对时间非常敏感的业务中,目前内核中只有networking和block devices用到了softirq;当然taklets和内核定时器也是基于softirq实现的。
分配index
添加新的index需要根据优先级来决定添加位置。
数值越小,优先级越高,通常情况下新的entry应该在BLOCK_SOFTIRQ和TASKLET_SOFTIRQ之间。
注册handler
通过
open_softirq()
来动态注册handler,两个参数,Index和handler。比如网卡子系统
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
softirq handler运行时中断是打开的,但当前处理器的softirq是关闭的,并且不能睡眠。但其他处理器可以执行softirq,甚至相同的softirq。因此,如果多个softirq访问共享数据,需要加锁,这也是为什么推荐使用tasklets的原因。
Raising softirq
在handler注册后,to mark it pending, 调用raise_softirq()
,例如网络子系统
raise_softirq(NET_TX_SOFTIRQ);
void raise_softirq(unsigned int nr)
{
unsigned long flags;
local_irq_save(flags);
raise_softirq_irqoff(nr);
local_irq_restore(flags);
}
如果是已经关闭中断的情况下,可以直接调用raise_softirq_irqoff
。
Tasklets
Tasklets是一种基于softirq实现的中断下半部。如前所述,tasklet 由两个软中断表示:HI_SOFTIRQ
和 TASKLET_SOFTIRQ
。
Tasklets由tasklet_struct
表示
struct tasklet_struct {
struct tasklet_struct *next; /* next tasklet in the list */
unsigned long state; /* state of the tasklet */
atomic_t count; /* reference counter */
void (*func)(unsigned long); /* tasklet handler function */
unsigned long data; /* argument to the tasklet function */
};
state
域值有 zero, TASKLET_STATE_SCHED
, or TASKLET_STATE_RUN
.TASKLET_STATE_SCHED denotes a tasklet that is scheduled to run, and TASKLET_STATE_RUN denotes a tasklet that is running.
count
字段用作 tasklet 的引用计数,如果非0,tasklet失能且不能运行,如果是0,tasklet使能如果被marked pending,则可以running.
Scheduling Tasklets
scheduling tasklets类似于raising softirq。需要调度的tasklets存储在tasklet_vec
(普通tasklet)和tasklet_hi_vec
(高优先级tasklet),这两个都是tasklet_structure
的链表。调度由tasklet_schedule()
和tasklet_hi_schedule()
函数执行。然后分别再调用
__tasklet_schedule()
和__tasklet_hi_schedule()
函数。流程如下。
void __tasklet_schedule(struct tasklet_struct *t)
{
__tasklet_schedule_common(t, &tasklet_vec,
TASKLET_SOFTIRQ);
}
static void __tasklet_schedule_common(struct tasklet_struct *t,
struct tasklet_head __percpu *headp,
unsigned int softirq_nr)
{
struct tasklet_head *head;
unsigned long flags;
local_irq_save(flags);
head = this_cpu_ptr(headp);
t->next = NULL;
*head->tail = t;
head->tail = &(t->next);
raise_softirq_irqoff(softirq_nr);
local_irq_restore(flags);
}
- Check whether the tasklet’s state is TASKLET_STATE_SCHED. If it is, the tasklet is already scheduled to run and the function can immediately return
- Call __tasklet_schedule().
- Save the state of the interrupt system, and then disable local interrupts.
- Add the tasklet to be scheduled to the head of the tasklet_vec or tasklet_hi_vec linked list, which is unique to each processor in the system.
- Raise the TASKLET_SOFTIRQ or HI_SOFTIRQ softirq, so do_softirq() executes this tasklet in the near future.
- Restore interrupts to their previous state and return
tasklet的核心执行函数是tasklet_action
和tasklet_hi_action
。流程如下
static inline int tasklet_trylock(struct tasklet_struct *t)
{
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
static inline void tasklet_unlock(struct tasklet_struct *t)
{
smp_mb__before_atomic();
clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
static void tasklet_action_common(struct softirq_action *a,
struct tasklet_head *tl_head,
unsigned int softirq_nr)
{
struct tasklet_struct *list;
local_irq_disable();
list = tl_head->head;
tl_head->head = NULL;
tl_head->tail = &tl_head->head;
local_irq_enable();
while (list) {
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED,
&t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
local_irq_disable();
t->next = NULL;
*tl_head->tail = t;
tl_head->tail = &t->next;
__raise_softirq_irqoff(softirq_nr);
local_irq_enable();
}
}
- Disable local interrupt delivery and retrieve the tasklet_vec or tasklet_hi_vec list for this processor.
- Clear the list for this processor by setting it equal to NULL
- Enable local interrupt delivery.
- Loop over each pending tasklet in the retrieved list.
- If this is a multiprocessing machine, check whether the tasklet is running on another processor by checking the TASKLET_STATE_RUN flag. If it is currently running, do not execute it now and skip to the next pending tasklet.
- If the tasklet is not currently running, set the TASKLET_STATE_RUN flag, so another processor will not run it.
- Check for a zero count value, to ensure that the tasklet is not disabled. If the tasklet is disabled, skip it and go to the next pending tasklet
- We now know that the tasklet is not running elsewhere, is marked as running so it will not start running elsewhere, and has a zero count value. Run the tasklet handler
- After the tasklet runs, clear the TASKLET_STATE_RUN flag in the tasklet’s state field.
- Repeat for the next pending tasklet, until there are no more scheduled tasklets waiting to run
使用tasklet
静态声明:
DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data);
The first macro creates the tasklet with a count of zero, and the tasklet is enabled.The second macro sets count to one, and the tasklet is disabled. 动态创建:
struct tasklet_struct t,:
tasklet_init(t, tasklet_handler, dev); /* dynamically as opposed to statically */
和softirq一样,tasklets也不能睡眠。 调度tasklets
tasklet_schedule(&my_tasklet); /* mark my_tasklet as pending */
As an optimization, a tasklet always runs on the processor that scheduled it—making better use of the processor’s cache, you hope.
可以通过tasklet_disable()
来关闭一个tasklet,这个函数会等待handler执行完成。如果不想等待,可以调用tasklet_disable_nosync()
。tasklet_enable()
使能对应tasklets. tasklet_kill()
将tasklets从等待队列移除并等待对应Handler执行完成,这个函数不能用于中断,因为它会睡眠
ksoftirqd
ksoftirq是per-cpu的内核线程(基于CFS,nice 19)。基本流程如下:
for (;;) {
if (!softirq_pending(cpu))
schedule();
set_current_state(TASK_RUNNING);
while (softirq_pending(cpu)) {
do_softirq();
if (need_resched())
schedule();
}
set_current_state(TASK_INTERRUPTIBLE);
}
为什么需要ksoftirqd内核线程? 因为softirq会自己唤醒自己,比如网络子系统,这样会造成用户态进程饥饿。解决用户进程饥饿的一种方式是针对再次pending的softirq不立即处理,等到下次中断来的时候再处理,但这样对softirq可能引起饥饿,特别是处于idle的系统。ksoftirqd是对这个问题的折中,对于再次pengding的softirq,由内核线程来处理,这样用户进程可以抢占,并且对应idle系统,ksoftirq可以很快运行。
工作队列
工作队列将工作推迟在内核线程(kwoker)中完成,工作队列运行在进程上下文,可以睡眠。 数据结构
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS]; // per-cpu
struct list_head list;
const char *name;
int singlethread;
int freezeable;
int rt;
};
struct cpu_workqueue_struct {
spinlock_t lock; /* lock protecting this structure */
struct list_head worklist; /* list of work */
wait_queue_head_t more_work;
struct work_struct *current_struct;
struct workqueue_struct *wq; /* associated workqueue_struct */
task_t *thread; /* associated thread */
};
内核线程的worker_thread()
函数循环取work处理。
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
run_workqueue(cwq);
}
-
The thread marks itself sleeping (the task’s state is set to TASK_INTERRUPTIBLE) and adds itself to a wait queue.
-
If the linked list of work is empty, the thread calls schedule() and goes to sleep.
-
If the list is not empty, the thread does not go to sleep. Instead, it marks itself TASK_RUNNING and removes itself from the wait queue
-
If the list is nonempty, the thread calls run_workqueue() to perform the deferred work.
run_workqueue简化流程:
while (!list_empty(&cwq->worklist)) {
struct work_struct *work;
work_func_t f;
void *data;
work = list_entry(cwq->worklist.next, struct work_struct, entry);
f = work->func;
list_del_init(cwq->worklist.next);
work_clear_pending(work);
f(work);
}
- While the list is not empty, it grabs the next entry in the list
- It retrieves the function that should be called, func, and its argument, data.
- It removes this entry from the list and clears the pending bit in the structure itself.
- It invokes the function.
- Repeat
work由work_struct
表示
<linux/workqueue.h>:
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
};
使用工作队列
静态创建:
DECLARE_WORK(name, void (*func)(void *), void *data);
动态创建:
INIT_WORK(struct work_struct *work, void (*func)(void *), void *data);
调度work 将工作放入工作队列。
schedule_work(&work);
schedule_delayed_work(&work, delay); // delay ticks
刷新工作
void flush_scheduled_work(void)
该函数会等待所有work执行完成才返回,等待过程中会睡眠,因此只能在进程上下文调用。 取消工作队列
int cancel_delayed_work(struct work_struct *work);
创建工作队列
如果默认工作队列不能满足需求,~可以自己创建工作队列,比如对性能要求比较高的场景时~。对性能要求比较高的场景,在高版本内核中可以使用system_highpri_wq
队列
struct workqueue_struct *create_workqueue(const char *name);
name即内核线程的名字,这个函数会为每个CPU创建一个内核线程。
在创建工作队列线程后,下面两个函数和schedule_work
和schedule_work_delay
相似。
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)
刷新工作队列
flush_workqueue(struct workqueue_struct *wq)
这个函数和flush_schduled_work
功能类似。
如何选择下半部
下半部锁竞争
If process context code and a bottom half share data, you need to disable bottom-half processing and obtain a lock before accessing the data.
If interrupt context code and a bottom half share data, you need to disable interrupts and obtain a lock before accessing the data.
上面函数不会关闭工作队列。