Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Day 27:

如果覺得文章對你有所啟發,可以考慮用 🌟 支持 Gthulhu 專案,短期目標是集齊 300 個 🌟 藉此被 CNCF Landscape 採納 [ref]

sched_ext 的衍生物基本上都使用 GPL 授權,因為 GPL 授權較為嚴格,許多公司可能會因為不想揭露其商業機密而不採用 sched_ext 或是 Gthulhu。

為了克服這個問題,我採納了 Jserv 老師的提議,將 Gthulhu 於 User-Space 的核心實作抽離出來,改以更為寬鬆的 Apache 2.0 授權。這讓使用者需要一定程度客製化私有排程器時不需要將修改的部分開放大眾存取。

type CustomScheduler interface {
    // Drain the queued task from eBPF and return the number of tasks drained
    DrainQueuedTask(s Sched) int
    // Select a task from the queued tasks and return it
    SelectQueuedTask(s Sched) *models.QueuedTask
    // Select a CPU for the given queued task, After selecting the CPU, the task will be dispatched to that CPU by Scheduler
    SelectCPU(s Sched, t *models.QueuedTask) (error, int32)
    // Determine the time slice for the given task
    DetermineTimeSlice(s Sched, t *models.QueuedTask) uint64
    // Get the number of objects in the pool (waiting to be dispatched)
    // GetPoolCount will be called by the scheduler to notify the number of tasks waiting to be dispatched (NotifyComplete)
    GetPoolCount() uint64
}

只要滿足以上 interface 的定義,就能替換掉 Gthulhu 的預設排程行為(vtime-based scheduler)。
需要注意的是,上列的幾個 API 需要傳入 Sched instance:

type Sched interface {
    DequeueTask(task *models.QueuedTask)
    DefaultSelectCPU(t *models.QueuedTask) (error, int32)
}
  • DequeueTask 由 Gthulhu 實作,負責從 eBPF Map 將待排程的任務取出。
  • DefaultSelectCPU 則會呼叫預設的 select cpu hook,這部分可自行替換。

此外,因為將核心實作抽離了,plugin 這個套件本身並不依賴 libbpfgo/scx/libbpf,也不需要 cgo。
這讓測試變得更加容易:

// MockScheduler implements the plugin.Sched interface for testing
type MockScheduler struct {
    taskQueue     []*models.QueuedTask
    queueIndex    int
    cpuAllocated  map[int32]int32 // PID -> CPU mapping
    defaultCPU    int32
    dequeueCount  int
    selectCPUCall int
}

// Compile-time check that MockScheduler implements plugin.Sched
var _ plugin.Sched = (*MockScheduler)(nil)

// NewMockScheduler creates a new mock scheduler for testing
func NewMockScheduler() *MockScheduler {
    return &MockScheduler{
        taskQueue:    make([]*models.QueuedTask, 0),
        queueIndex:   0,
        cpuAllocated: make(map[int32]int32),
        defaultCPU:   0,
    }
}

// EnqueueTask adds a task to the mock scheduler's queue
func (m *MockScheduler) EnqueueTask(task *models.QueuedTask) {
    m.taskQueue = append(m.taskQueue, task)
}

// DequeueTask implements plugin.Sched.DequeueTask
func (m *MockScheduler) DequeueTask(task *models.QueuedTask) {
    m.dequeueCount++
    if m.queueIndex >= len(m.taskQueue) {
        // No more tasks, return sentinel value
        task.Pid = -1
        return
    }

    // Copy the task from queue
    qt := m.taskQueue[m.queueIndex]
    *task = *qt
    m.queueIndex++
}

// DefaultSelectCPU implements plugin.Sched.DefaultSelectCPU
func (m *MockScheduler) DefaultSelectCPU(t *models.QueuedTask) (error, int32) {
    m.selectCPUCall++
    // Simple round-robin CPU selection
    cpu := m.defaultCPU
    m.defaultCPU = (m.defaultCPU + 1) % 4 // Assume 4 CPUs
    m.cpuAllocated[t.Pid] = cpu
    return nil, cpu
}

// Reset resets the mock scheduler state
func (m *MockScheduler) Reset() {
    m.taskQueue = make([]*models.QueuedTask, 0)
    m.queueIndex = 0
    m.cpuAllocated = make(map[int32]int32)
    m.defaultCPU = 0
    m.dequeueCount = 0
    m.selectCPUCall = 0
}

在測試檔案中,我們預先定義好 MockScheduler 的行為,便能夠在沒有 eBPF 程式載入的前提下對 Gthulhu plugin 進行測試:

func TestXXX(t *testing.T) {
    // Create plugin instance
    gthulhuPlugin := NewGthulhuPlugin(5000*1000, 500*1000)

    // Create mock scheduler
    mockSched := NewMockScheduler()

    t.Run("MultipleTasksWorkflow", func(t *testing.T) {
        mockSched.Reset()
        gthulhuPlugin = NewGthulhuPlugin(5000*1000, 500*1000) // Reset plugin

        // Create multiple tasks with different priorities
        tasks := []*models.QueuedTask{
            {Pid: 100, Weight: 100, Vtime: 0, Tgid: 100, StartTs: 1000, StopTs: 2000},
            {Pid: 200, Weight: 150, Vtime: 0, Tgid: 200, StartTs: 1500, StopTs: 2500},
            {Pid: 300, Weight: 80, Vtime: 0, Tgid: 300, StartTs: 2000, StopTs: 3000},
        }

        // Enqueue all tasks
        for _, task := range tasks {
            mockSched.EnqueueTask(task)
        }

        // Drain all tasks
        drained := gthulhuPlugin.DrainQueuedTask(mockSched)
        if drained != 3 {
            t.Errorf("DrainQueuedTask = %d; want 3", drained)
        }

        // Verify pool count
        if gthulhuPlugin.GetPoolCount() != 3 {
            t.Errorf("GetPoolCount = %d; want 3", gthulhuPlugin.GetPoolCount())
        }

        // Process all tasks
        processedTasks := make([]*models.QueuedTask, 0)
        for gthulhuPlugin.GetPoolCount() > 0 {
            task := gthulhuPlugin.SelectQueuedTask(mockSched)
            if task == nil {
                t.Fatal("SelectQueuedTask returned nil while pool count > 0")
            }

            // Select CPU and determine time slice
            err, cpu := gthulhuPlugin.SelectCPU(mockSched, task)
            if err != nil {
                t.Errorf("SelectCPU error: %v", err)
            }
            if cpu < 0 {
                t.Errorf("Invalid CPU selected: %d", cpu)
            }

            _ = gthulhuPlugin.DetermineTimeSlice(mockSched, task)
            processedTasks = append(processedTasks, task)
        }

        // Verify all tasks were processed
        if len(processedTasks) != 3 {
            t.Errorf("Processed tasks = %d; want 3", len(processedTasks))
        }

        // Verify pool is empty
        if gthulhuPlugin.GetPoolCount() != 0 {
            t.Errorf("Final GetPoolCount = %d; want 0", gthulhuPlugin.GetPoolCount())
        }
    })
}

plugin pattern 為 Gthulhu 帶來了多變的靈活性。我們將在下一篇文章中嘗試實作一個簡易的 scheduler!