Blog icon indicating copy to clipboard operation
Blog copied to clipboard

Chapter 8 Bottom Halves and Deferring work

Open jason--liu opened this issue 3 years ago • 0 comments

由于各种限制,中断处理程序只能构成任何中断处理流程的前半部分。 中断是异步发生的,为了避免打断其他代码执行路径(包括启动中断处理),中断处理程序需要尽可能执行得快。 中断处理默认会关闭当前中断,如果中断迟迟执行不完,会导致中断关闭时间过长,这对一些时间敏感的任务是很不友好的,比如网络,关中断太长可能导致丢包。

Bottom Halves

中断处理程序一般需要回复硬件收到了中断,也有可能需要从硬件拷贝数据。没有绝对的界限说哪些事情应该在上半部,哪些应该在下半部。一些区分上下半部的建议:

  • If the work is time sensitive, perform it in the interrupt handler.
  • If the work is related to the hardware, perform it in the interrupt handler
  • If the work needs to ensure that another interrupt (particularly the same interrupt) does not interrupt it, perform it in the interrupt handler.
  • For everything else, consider performing the work in the bottom half

为什么需要下半部

中断发生的时候,当前中断在所有CPU上是关闭的。更糟的是,有IRQF_DISABLED的标志导致所有irq lines在当前CPU上关闭,并且当前irq line在所有CPU上是关闭的。因此,减少中断处理时间对系统性能尤为重要。 中断下半部defer work at any time,通常下半部在中断返回后立即运行。中断下半部运行时,所有中断已打开

softirqs和tasklets

softirqs是静态创建的,必须在静态编译时注册。他们可以同时运行在任何CPU上,甚至两个同类型的softirqs可以同时运行。 tasklets是建立在softirqs之上动态创建的。两个不同的tasklets可以同时运行在不同的CPU上,相同类型的tasklets不能同时运行。 大多数情况下,推荐使用tasklets;当对性能比较敏感时,softirqs是很好的选择。

softirqs

softirq_action代表了softirqs(linux/interrupt.h)

struct softirq_action
{
	void	(*action)(struct softirq_action *);
};

kernel/softirq.c中定义了32个softirq entry.

static struct softirq_action softirq_vec[NR_SOFTIRQS] ;

每注册一个softirq就会用掉数组中的一个entry。

softirq handler

原型如下

void softirq_handler(struct softirq_action *)

softirq不会抢占另外一个softirq。能抢占softirq的只有中断。

运行softirq

softirq在运行之前必须先被marked,也就是所谓的raising the softirq。通常由中断处理程序标记来marks softirq.被标记的softirq在以下几个地方被检查和执行:

  • In the return from hardware interrupt code path
  • In the ksoftirqd kernel thread
  • In any code that explicitly checks for and executes pending softirqs, such as the networking subsystem

softirq执行流程是do_softirq()调用__do_softirq()

u32 pending;

pending = local_softirq_pending(); //挂起软中断的 32 位掩码——如果设置了第 n 位,则第 n 个软中断正在挂起
if (pending) {
	struct softirq_action *h;
	
	/* reset the pending bitmask */
	set_softirq_pending(0);
	
	h = softirq_vec;
	do {
		if (pending & 1)
		h->action(h);
		h++;
		pending >>= 1;
	} while (pending);
}

流程比较简单,如果有pending softirq,__do_softirq挨个遍历,并执行对应的handler。

  1. It sets the pending local variable to the value returned by the local_softirq_pending() macro.This is a 32-bit mask of pending softirqs—if bit n is set, the nth softirq is pending
  2. Now that the pending bitmask of softirqs is saved, it clears the actual bitmask
  3. The pointer h is set to the first entry in the softirq_vec
  4. If the first bit in pending is set, h->action(h) is called
  5. The pointer h is incremented by one so that it now points to the second entry in the softirq_vec array.
  6. The bitmask pending is right-shifted by one.This tosses the first bit away and moves all other bits one place to the right. Consequently, the second bit is now the first (and so on)
  7. The pointer h now points to the second entry in the array, and the pending bitmask now has the second bit as the first. Repeat the previous steps.
  8. Continue repeating until pending is zero, at which point there are no more pending softirqs and the work is done.

使用softirqs

softirq一般是用在对时间非常敏感的业务中,目前内核中只有networking和block devices用到了softirq;当然taklets和内核定时器也是基于softirq实现的。 分配index 添加新的index需要根据优先级来决定添加位置。 image 数值越小,优先级越高,通常情况下新的entry应该在BLOCK_SOFTIRQ和TASKLET_SOFTIRQ之间。 注册handler 通过open_softirq()来动态注册handler,两个参数,Index和handler。比如网卡子系统

open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);

softirq handler运行时中断是打开的,但当前处理器的softirq是关闭的,并且不能睡眠。但其他处理器可以执行softirq,甚至相同的softirq。因此,如果多个softirq访问共享数据,需要加锁,这也是为什么推荐使用tasklets的原因。 Raising softirq 在handler注册后,to mark it pending, 调用raise_softirq(),例如网络子系统

raise_softirq(NET_TX_SOFTIRQ);

void raise_softirq(unsigned int nr)
{
	unsigned long flags;

	local_irq_save(flags);
	raise_softirq_irqoff(nr);
	local_irq_restore(flags);
}

如果是已经关闭中断的情况下,可以直接调用raise_softirq_irqoff

Tasklets

Tasklets是一种基于softirq实现的中断下半部。如前所述,tasklet 由两个软中断表示:HI_SOFTIRQTASKLET_SOFTIRQ。 Tasklets由tasklet_struct表示

struct tasklet_struct {
	struct tasklet_struct *next; 	/* next tasklet in the list */
	unsigned long state; 			/* state of the tasklet */
	atomic_t count; 				/* reference counter */
	void (*func)(unsigned long); 	/* tasklet handler function */
	unsigned long data; 			/* argument to the tasklet function */
};

state域值有 zero, TASKLET_STATE_SCHED, or TASKLET_STATE_RUN.TASKLET_STATE_SCHED denotes a tasklet that is scheduled to run, and TASKLET_STATE_RUN denotes a tasklet that is running. count 字段用作 tasklet 的引用计数,如果非0,tasklet失能且不能运行,如果是0,tasklet使能如果被marked pending,则可以running.

Scheduling Tasklets

scheduling tasklets类似于raising softirq。需要调度的tasklets存储在tasklet_vec(普通tasklet)和tasklet_hi_vec(高优先级tasklet),这两个都是tasklet_structure的链表。调度由tasklet_schedule()tasklet_hi_schedule()函数执行。然后分别再调用 __tasklet_schedule()__tasklet_hi_schedule()函数。流程如下。

void __tasklet_schedule(struct tasklet_struct *t)
{
	__tasklet_schedule_common(t, &tasklet_vec,
				  TASKLET_SOFTIRQ);
}

static void __tasklet_schedule_common(struct tasklet_struct *t,
				      struct tasklet_head __percpu *headp,
				      unsigned int softirq_nr)
{
	struct tasklet_head *head;
	unsigned long flags;

	local_irq_save(flags);
	head = this_cpu_ptr(headp);
	t->next = NULL;
	*head->tail = t;
	head->tail = &(t->next);
	raise_softirq_irqoff(softirq_nr);
	local_irq_restore(flags);
}

  1. Check whether the tasklet’s state is TASKLET_STATE_SCHED. If it is, the tasklet is already scheduled to run and the function can immediately return
  2. Call __tasklet_schedule().
  3. Save the state of the interrupt system, and then disable local interrupts.
  4. Add the tasklet to be scheduled to the head of the tasklet_vec or tasklet_hi_vec linked list, which is unique to each processor in the system.
  5. Raise the TASKLET_SOFTIRQ or HI_SOFTIRQ softirq, so do_softirq() executes this tasklet in the near future.
  6. Restore interrupts to their previous state and return

tasklet的核心执行函数是tasklet_actiontasklet_hi_action。流程如下

static inline int tasklet_trylock(struct tasklet_struct *t)
{
	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}

static inline void tasklet_unlock(struct tasklet_struct *t)
{
	smp_mb__before_atomic();
	clear_bit(TASKLET_STATE_RUN, &(t)->state);
}

static void tasklet_action_common(struct softirq_action *a,
				  struct tasklet_head *tl_head,
				  unsigned int softirq_nr)
{
	struct tasklet_struct *list;

	local_irq_disable();
	list = tl_head->head;
	tl_head->head = NULL;
	tl_head->tail = &tl_head->head;
	local_irq_enable();

	while (list) {
		struct tasklet_struct *t = list;

		list = list->next;

		if (tasklet_trylock(t)) {
			if (!atomic_read(&t->count)) {
				if (!test_and_clear_bit(TASKLET_STATE_SCHED,
							&t->state))
					BUG();
				t->func(t->data);
				tasklet_unlock(t);
				continue;
			}
			tasklet_unlock(t);
		}

		local_irq_disable();
		t->next = NULL;
		*tl_head->tail = t;
		tl_head->tail = &t->next;
		__raise_softirq_irqoff(softirq_nr);
		local_irq_enable();
	}
}
  1. Disable local interrupt delivery and retrieve the tasklet_vec or tasklet_hi_vec list for this processor.
  2. Clear the list for this processor by setting it equal to NULL
  3. Enable local interrupt delivery.
  4. Loop over each pending tasklet in the retrieved list.
  5. If this is a multiprocessing machine, check whether the tasklet is running on another processor by checking the TASKLET_STATE_RUN flag. If it is currently running, do not execute it now and skip to the next pending tasklet.
  6. If the tasklet is not currently running, set the TASKLET_STATE_RUN flag, so another processor will not run it.
  7. Check for a zero count value, to ensure that the tasklet is not disabled. If the tasklet is disabled, skip it and go to the next pending tasklet
  8. We now know that the tasklet is not running elsewhere, is marked as running so it will not start running elsewhere, and has a zero count value. Run the tasklet handler
  9. After the tasklet runs, clear the TASKLET_STATE_RUN flag in the tasklet’s state field.
  10. Repeat for the next pending tasklet, until there are no more scheduled tasklets waiting to run

使用tasklet

静态声明:

DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data);

The first macro creates the tasklet with a count of zero, and the tasklet is enabled.The second macro sets count to one, and the tasklet is disabled. 动态创建:

struct tasklet_struct t,:
tasklet_init(t, tasklet_handler, dev);     /* dynamically as opposed to statically */

和softirq一样,tasklets也不能睡眠。 调度tasklets

tasklet_schedule(&my_tasklet);     /* mark my_tasklet as pending */

As an optimization, a tasklet always runs on the processor that scheduled it—making better use of the processor’s cache, you hope. 可以通过tasklet_disable()来关闭一个tasklet,这个函数会等待handler执行完成。如果不想等待,可以调用tasklet_disable_nosync()tasklet_enable()使能对应tasklets. tasklet_kill()将tasklets从等待队列移除并等待对应Handler执行完成,这个函数不能用于中断,因为它会睡眠

ksoftirqd

ksoftirq是per-cpu的内核线程(基于CFS,nice 19)。基本流程如下:

for (;;) {
	if (!softirq_pending(cpu))
		schedule();
		
	set_current_state(TASK_RUNNING);
	
	while (softirq_pending(cpu)) {
		do_softirq();
		if (need_resched())
			schedule();
	}
	
	set_current_state(TASK_INTERRUPTIBLE);
}

为什么需要ksoftirqd内核线程? 因为softirq会自己唤醒自己,比如网络子系统,这样会造成用户态进程饥饿。解决用户进程饥饿的一种方式是针对再次pending的softirq不立即处理,等到下次中断来的时候再处理,但这样对softirq可能引起饥饿,特别是处于idle的系统。ksoftirqd是对这个问题的折中,对于再次pengding的softirq,由内核线程来处理,这样用户进程可以抢占,并且对应idle系统,ksoftirq可以很快运行。

工作队列

工作队列将工作推迟在内核线程(kwoker)中完成,工作队列运行在进程上下文,可以睡眠。 数据结构

/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
	struct cpu_workqueue_struct cpu_wq[NR_CPUS];    // per-cpu
	struct list_head list;
	const char *name;
	int singlethread;
	int freezeable;
	int rt;
};

struct cpu_workqueue_struct {
	spinlock_t lock; /* lock protecting this structure */
	struct list_head worklist; /* list of work */
	wait_queue_head_t more_work;
	struct work_struct *current_struct;
	struct workqueue_struct *wq; /* associated workqueue_struct */
	task_t *thread; /* associated thread */
};

内核线程的worker_thread()函数循环取work处理。

for (;;) {
	prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
	if (list_empty(&cwq->worklist))
	    schedule();
	finish_wait(&cwq->more_work, &wait);
	run_workqueue(cwq);
}
  1. The thread marks itself sleeping (the task’s state is set to TASK_INTERRUPTIBLE) and adds itself to a wait queue.

  2. If the linked list of work is empty, the thread calls schedule() and goes to sleep.

  3. If the list is not empty, the thread does not go to sleep. Instead, it marks itself TASK_RUNNING and removes itself from the wait queue

  4. If the list is nonempty, the thread calls run_workqueue() to perform the deferred work.

run_workqueue简化流程:

while (!list_empty(&cwq->worklist)) {
	struct work_struct *work;
	work_func_t f;
	void *data;
	
	work = list_entry(cwq->worklist.next, struct work_struct, entry);
	f = work->func;
	list_del_init(cwq->worklist.next);
	work_clear_pending(work);
	f(work);
}
  1. While the list is not empty, it grabs the next entry in the list
  2. It retrieves the function that should be called, func, and its argument, data.
  3. It removes this entry from the list and clears the pending bit in the structure itself.
  4. It invokes the function.
  5. Repeat

work由work_struct表示

<linux/workqueue.h>:
struct work_struct {
    atomic_long_t data;
    struct list_head entry;
    work_func_t func;
};

使用工作队列

静态创建:

DECLARE_WORK(name, void (*func)(void *), void *data);

动态创建:

INIT_WORK(struct work_struct *work, void (*func)(void *), void *data);

调度work 将工作放入工作队列。

schedule_work(&work);
schedule_delayed_work(&work, delay);   // delay ticks

刷新工作

void flush_scheduled_work(void)

该函数会等待所有work执行完成才返回,等待过程中会睡眠,因此只能在进程上下文调用。 取消工作队列

int cancel_delayed_work(struct work_struct *work);

创建工作队列

如果默认工作队列不能满足需求,~可以自己创建工作队列,比如对性能要求比较高的场景时~。对性能要求比较高的场景,在高版本内核中可以使用system_highpri_wq队列

struct workqueue_struct *create_workqueue(const char *name);

name即内核线程的名字,这个函数会为每个CPU创建一个内核线程。 在创建工作队列线程后,下面两个函数和schedule_workschedule_work_delay相似。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)
int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)

刷新工作队列

flush_workqueue(struct workqueue_struct *wq)

这个函数和flush_schduled_work功能类似。

如何选择下半部

image

下半部锁竞争

If process context code and a bottom half share data, you need to disable bottom-half processing and obtain a lock before accessing the data. If interrupt context code and a bottom half share data, you need to disable interrupts and obtain a lock before accessing the data. image 上面函数不会关闭工作队列。

jason--liu avatar Jul 06 '21 03:07 jason--liu