DPDKのソースコードから読み解くユーザ空間ドライバとPCIデバイスの関係

執筆者 : 岡部 究


1. はじめに

DPDKを触っていると気になることがありませんか。 「カーネルのEthernetドライバの代わりにユーザ空間でEthernetドライバを作っている、とあるが実際のPCIデバイスにどうやってユーザ空間からアクセスしているのだろうか?」 本記事ではDPDKのソースコードを読み解くことで、上記の疑問を(部分的ながら)解決します。

2. 調査対象の環境

  • Debian GNU/Linux 11.6
  • DPDK 22.11.0

3. ユーザ空間ドライバとカーネル空間ドライバ

DPDKのユーザ空間ドライバはPoll Mode Driver (PMD)と呼ばれ、様々なEthernetを対象としています。 dpdk-devbind.pyスクリプトを覗くことでサポートしているこれらのEthernetのリストを知ることができます。 本記事ではQEMU上でお手軽に試験できるようにnet_virtioドライバを調査します。

さらにPMDはユーザ空間に単に配置すれば動くものではなく、実際のPCIデバイスと繋ぐカーネル空間ドライバが必要になります。 以前はigb_uioなどのカーネル空間ドライバが使用されるケースもありましたが、IOMMUをサポートしているためvfio-pciドライバを今回は使います。

本来ハードウェアとして提供されるPCIデバイスとカーネル空間で動くEthernetドライバが実現している機能がどう上記2つのドライバで実現されているかを本記事では調べます。

4. PCIデバイスの復習

復習のためにPCIデバイスとはどのようなものだったのか振り返りましょう。 PCIデバイスは、以下のメモリ空間を持っていました。

  • PCI コンフィグレーション空間
  • PCI I/O空間
  • PCI メモリ空間

さらにPCIデバイスは割り込みピンを持っており、通常コンテキストの実行中の任意のタイミングで設定した割り込みハンドラを実行することができました。

上記4つの要素がnet_virtioドライバとvfio-pciドライバでどのように扱われているか調べましょう。

5. PCI コンフィグレーション空間からの読み出し

やや天下り的ですが、DPDKにはPCI コンフィグレーション空間のベースアドレスレジスタを読んで、PCI I/O空間かを判定する以下のコードがあります。

static int
pci_vfio_is_ioport_bar(int vfio_dev_fd, int bar_index)
{
    uint32_t ioport_bar;
    int ret;

    ret = pread64(vfio_dev_fd, &ioport_bar, sizeof(ioport_bar),
              VFIO_GET_REGION_ADDR(VFIO_PCI_CONFIG_REGION_INDEX)
              + PCI_BASE_ADDRESS_0 + bar_index*4); // <= PCI コンフィグレーション空間からの読み出し
    if (ret != sizeof(ioport_bar)) {
        RTE_LOG(ERR, EAL, "Cannot read command (%x) from config space!\n",
            PCI_BASE_ADDRESS_0 + bar_index*4);
        return -1;
    }

    return (ioport_bar & PCI_BASE_ADDRESS_SPACE_IO) != 0;
}

上記のようにファイルディスクリプタvfio_dev_fdから指定したオフセットを読み出せばPCI コンフィグレーション空間が読み出せます。 vfio_dev_fdは固定幅の「リージョン」に分割されていて、VFIO_GET_REGION_ADDR(VFIO_PCI_CONFIG_REGION_INDEX)といったオフセット指定で読み出します。 ここではVFIO_PCI_CONFIG_REGION_INDEXを指定しているので、PCI コンフィグレーション空間にアクセスしています。

ではこのvfio_dev_fdは何処からやってきたのでしょうか。 このファイルディスクリプタはrte_vfio_setup_device()関数でオープンされています。 が、この初期化コードは少し長いので先に要点を解説します。

  • シンボリックリンク/sys/bus/pci/devices/PCIアドレス/iommu_groupを読んでIOMMUグループを得ます。例えば以下の例だと18です。
$ readlink /sys/bus/pci/devices/0000\:07\:00.0/iommu_group
../../../../kernel/iommu_groups/18
  • ファイル/dev/vfio/先のIOMMUグループを開いてグループファイルディスクリプタvfio_group_fdを得ます。
  • ioctl(vfio_group_fd, VFIO_GROUP_GET_DEVICE_FD, PCIアドレス)を実行した返り値としてvfio_dev_fdを得ます。

以下は上記解釈の根拠となるコードの抜粋です。

static int
pci_vfio_map_resource_primary(struct rte_pci_device *dev)
{
// --snip--
    ret = rte_vfio_setup_device(rte_pci_get_sysfs_path(), pci_addr,
                    &vfio_dev_fd, &device_info);
// --snip--

#define SYSFS_PCI_DEVICES "/sys/bus/pci/devices"

const char *rte_pci_get_sysfs_path(void)
{
    const char *path = NULL;

#ifdef RTE_EXEC_ENV_LINUX
    path = getenv("SYSFS_PCI_DEVICES");
    if (path == NULL)
        return SYSFS_PCI_DEVICES;
#endif

    return path;
}

int
rte_vfio_setup_device(const char *sysfs_base, const char *dev_addr,
        int *vfio_dev_fd, struct vfio_device_info *device_info)
{
// --snip--
    /* get group number */
    ret = rte_vfio_get_group_num(sysfs_base, dev_addr, &iommu_group_num);
// --snip--
    /* get the actual group fd */
    vfio_group_fd = rte_vfio_get_group_fd(iommu_group_num);
// --snip--
    /* get a file descriptor for the device */
    *vfio_dev_fd = ioctl(vfio_group_fd, VFIO_GROUP_GET_DEVICE_FD, dev_addr);
// --snip--

int
rte_vfio_get_group_num(const char *sysfs_base,
        const char *dev_addr, int *iommu_group_num)
{
// --snip--
    /* try to find out IOMMU group for this device */
    snprintf(linkname, sizeof(linkname),
             "%s/%s/iommu_group", sysfs_base, dev_addr);

    ret = readlink(linkname, filename, sizeof(filename)); // <= シンボリックリンク`/sys/bus/pci/devices/PCIアドレス/iommu_group`を読む
// --snip--
    ret = rte_strsplit(filename, sizeof(filename),
            tok, RTE_DIM(tok), '/');
// --snip--
    /* IOMMU group is always the last token */
    errno = 0;
    group_tok = tok[ret - 1];
    end = group_tok;
    *iommu_group_num = strtol(group_tok, &end, 10); // <= IOMMUグループを文字列から整数に変換
// --snip--

int
rte_vfio_get_group_fd(int iommu_group_num)
{
    struct vfio_config *vfio_cfg;

    /* get the vfio_config it belongs to */
    vfio_cfg = get_vfio_cfg_by_group_num(iommu_group_num);
    vfio_cfg = vfio_cfg ? vfio_cfg : default_vfio_cfg;

    return vfio_get_group_fd(vfio_cfg, iommu_group_num);
}

static int
vfio_get_group_fd(struct vfio_config *vfio_cfg,
        int iommu_group_num)
{
// --snip--
    vfio_group_fd = vfio_open_group_fd(iommu_group_num);
// --snip--
    return vfio_group_fd;
}

#define VFIO_GROUP_FMT "/dev/vfio/%u"

static int
vfio_open_group_fd(int iommu_group_num)
{
// --snip--
    /* if primary, try to open the group */
    if (internal_conf->process_type == RTE_PROC_PRIMARY) {
        /* try regular group format */
        snprintf(filename, sizeof(filename),
                 VFIO_GROUP_FMT, iommu_group_num);
        vfio_group_fd = open(filename, O_RDWR); // <= ファイル`/dev/vfio/先のIOMMUグループ`を開く
// --snip--
        return vfio_group_fd;

6. PCI I/O空間の読み書き

前章でお話しした通り、ファイルディスクリプタvfio_dev_fdにはリージョンがありました。

DPDKのソースコードでVFIO_GET_REGION_ADDR()マクロを使ってPCI コンフィグレーション空間以外のリージョンにアクセスしている箇所を調べると以下のコードが見つかります。

int
pci_vfio_ioport_map(struct rte_pci_device *dev, int bar,
            struct rte_pci_ioport *p)
{
    if (bar < VFIO_PCI_BAR0_REGION_INDEX ||
        bar > VFIO_PCI_BAR5_REGION_INDEX) {
        RTE_LOG(ERR, EAL, "invalid bar (%d)!\n", bar);
        return -1;
    }

    p->dev = dev;
    p->base = VFIO_GET_REGION_ADDR(bar);
    return 0;
}

void
pci_vfio_ioport_read(struct rte_pci_ioport *p,
             void *data, size_t len, off_t offset)
{
    const struct rte_intr_handle *intr_handle = p->dev->intr_handle;
    int vfio_dev_fd = rte_intr_dev_fd_get(intr_handle);

    if (vfio_dev_fd < 0)
        return;

    if (pread64(vfio_dev_fd, data,
            len, p->base + offset) <= 0)

しかし今回作った環境では上記の関数は使われないようです。 virtioデバイスにはmodernとlegacyの二種類があり、今回の環境で動作しているのはmodernで、PCI I/O空間のアクセスはlegacyの時のみのようです。

いずれにしてもPCI コンフィグレーション空間と同様に、PCI I/O空間へもファイルディスクリプタvfio_dev_fdのリージョンに読み書きすればアクセスできることがわかりました。

7. PCI メモリ空間の読み書き

これも天下り的になりますが、DPDKでメモリレジスタにアクセスしている箇所を調べてみましょう。きっとvolatileを使っているはずです。 するとrte_read16()関数のようなわかりやすいAPIが見つかります。

static __rte_always_inline uint16_t
rte_read16(const volatile void *addr)
{
    uint16_t val;
    val = rte_read16_relaxed(addr);
    rte_io_rmb();
    return val;
}

static __rte_always_inline uint16_t
rte_read16_relaxed(const volatile void *addr)
{
    return *(const volatile uint16_t *)addr;
}

このrte_read16()関数はnet_virtioドライバでは例えば以下のように使われます。

static int
modern_setup_queue(struct virtio_hw *hw, struct virtqueue *vq)
{
    struct virtio_pci_dev *dev = virtio_pci_get_dev(hw);
// --snip--
    notify_off = rte_read16(&dev->common_cfg->queue_notify_off);

dev->common_cfgポインタはstruct virtio_pci_common_cfg *型で、以下のようにレジスタの定義が羅列してあります。

struct virtio_pci_common_cfg {
    /* About the whole device. */
    uint32_t device_feature_select;    /* read-write */
    uint32_t device_feature;   /* read-only */
    uint32_t guest_feature_select; /* read-write */
    uint32_t guest_feature;        /* read-write */
    uint16_t msix_config;      /* read-write */
    uint16_t num_queues;       /* read-only */
    uint8_t device_status;     /* read-write */
    uint8_t config_generation; /* read-only */

    /* About a specific virtqueue. */
    uint16_t queue_select;     /* read-write */
    uint16_t queue_size;       /* read-write, power of 2. */
    uint16_t queue_msix_vector;    /* read-write */
    uint16_t queue_enable;     /* read-write */
    uint16_t queue_notify_off; /* read-only */

dev->common_cfgポインタはPCIベースアドレスに配置されたPCIケーパビリティです。 このPCIケーパビリティはVIRTIO Committee Specification "4.1.4 Virtio Structure PCI Capabilities"で詳しく解説されています。

コードから追ってみましょう。PCIケーパビリティの読み出しはvirtio_read_caps()関数からはじまります。

int
vtpci_init(struct rte_pci_device *pci_dev, struct virtio_pci_dev *dev)
{
    struct virtio_hw *hw = &dev->hw;

    RTE_BUILD_BUG_ON(offsetof(struct virtio_pci_dev, hw) != 0);

    /*
    * Try if we can succeed reading virtio pci caps, which exists
    * only on modern pci device. If failed, we fallback to legacy
    * virtio handling.
    */
    if (virtio_read_caps(pci_dev, hw) == 0) {

virtio_read_caps()関数はrte_pci_map_device()関数を呼び出してPCI メモリ空間をマップします。 このマップは後に調べるとして、virtio_read_caps()関数の残りの部分ではVIRTIO Committee Specificationで説明されている通りPCI コンフィグレーション空間からPCIケーパビリティを辿ります。

static int
virtio_read_caps(struct rte_pci_device *pci_dev, struct virtio_hw *hw)
{
    struct virtio_pci_dev *dev = virtio_pci_get_dev(hw);
    uint8_t pos;
    struct virtio_pci_cap cap;
    int ret;

    if (rte_pci_map_device(pci_dev)) {
        PMD_INIT_LOG(DEBUG, "failed to map pci device!");
        return -1;
    }

    ret = rte_pci_read_config(pci_dev, &pos, 1, PCI_CAPABILITY_LIST);
// --snip--
    while (pos) {
        ret = rte_pci_read_config(pci_dev, &cap, 2, pos);
// --snip--
        ret = rte_pci_read_config(pci_dev, &cap, sizeof(cap), pos);
        if (ret != sizeof(cap)) {
            PMD_INIT_LOG(DEBUG,
                     "failed to read pci cap at pos: %x ret %d",
                     pos, ret);
            break;
        }

        PMD_INIT_LOG(DEBUG,
            "[%2x] cfg type: %u, bar: %u, offset: %04x, len: %u",
            pos, cap.cfg_type, cap.bar, cap.offset, cap.length);

        switch (cap.cfg_type) {
        case VIRTIO_PCI_CAP_COMMON_CFG:
            dev->common_cfg = get_cfg_addr(pci_dev, &cap);
            break;
// --snip--
next:
        pos = cap.cap_next;
    }

/* Read PCI config space. */
int rte_pci_read_config(const struct rte_pci_device *device,
        void *buf, size_t len, off_t offset)
{
    char devname[RTE_DEV_NAME_MAX_LEN] = "";
    const struct rte_intr_handle *intr_handle = device->intr_handle;

    switch (device->kdrv) {
    case RTE_PCI_KDRV_IGB_UIO:
    case RTE_PCI_KDRV_UIO_GENERIC:
        return pci_uio_read_config(intr_handle, buf, len, offset);
#ifdef VFIO_PRESENT
    case RTE_PCI_KDRV_VFIO:
        return pci_vfio_read_config(intr_handle, buf, len, offset);
// --snip--

int
pci_vfio_read_config(const struct rte_intr_handle *intr_handle,
            void *buf, size_t len, off_t offs)
{
    int vfio_dev_fd = rte_intr_dev_fd_get(intr_handle);

    if (vfio_dev_fd < 0)
        return -1;

    return pread64(vfio_dev_fd, buf, len,
           VFIO_GET_REGION_ADDR(VFIO_PCI_CONFIG_REGION_INDEX) + offs); // <= 5章で調べた通りPCI コンフィグレーション空間からの読み出し

上記のコードではPCIケーパビリティのチェーンを辿って、VIRTIO_PCI_CAP_COMMON_CFGというタイプを見つけたらそのケーパビリティをget_cfg_addr()関数でユーザ空間仮想アドレスのポインタに変換してdev->common_cfgポインタに代入しています。 先のmodern_setup_queue()関数はこのdev->common_cfgポインタの先を読み出していました。 具体的にget_cfg_addr()関数が何を返すのかというと、以下のようにポインタdev->mem_resource[cap->bar].addr + cap->offsetです。

static void *
get_cfg_addr(struct rte_pci_device *dev, struct virtio_pci_cap *cap)
{
    uint8_t  bar    = cap->bar;
    uint32_t length = cap->length;
    uint32_t offset = cap->offset;
    uint8_t *base;
// --snip--
    base = dev->mem_resource[bar].addr;
    if (base == NULL) {
        PMD_INIT_LOG(ERR, "bar %u base addr is NULL", bar);
        return NULL;
    }

    return base + offset;
}

cap->barcap->offsetはPCI コンフィグレーション空間から読み出した値です。 ではdev->mem_resource[]配列はどこで初期化されたのでしょうか?

ここで、rte_pci_map_device()関数の続きを調べましょう。 vfio-pciドライバの場合、この関数は単にpci_vfio_map_resource_primary()関数を呼び出します。

int
rte_pci_map_device(struct rte_pci_device *dev)
{
    int ret = -1;

    /* try mapping the NIC resources using VFIO if it exists */
    switch (dev->kdrv) {
    case RTE_PCI_KDRV_VFIO:
#ifdef VFIO_PRESENT
        if (pci_vfio_is_enabled())
            ret = pci_vfio_map_resource(dev);
#endif
// --snip--

int
pci_vfio_map_resource(struct rte_pci_device *dev)
{
    if (rte_eal_process_type() == RTE_PROC_PRIMARY)
        return pci_vfio_map_resource_primary(dev);
// --snip--

pci_vfio_map_resource_primary()関数はpci_vfio_get_region_info()関数を経由してvfio_dev_fdにioctl(2)を呼び出してリージョンの情報を読み出します。この情報の中にはリージョンのオフセットreg->offsetとサイズreg->sizeが入っています。 これらオフセットとサイズと共に、PCI メモリ空間をマップするアドレスをhugepageの後ろから決めて、vfio_res->maps[]配列に書き込みます。

void *pci_map_addr = NULL;

static int
pci_vfio_map_resource_primary(struct rte_pci_device *dev)
{
// --snip--
    /* map BARs */
    maps = vfio_res->maps;
// --snip--
    for (i = 0; i < vfio_res->nb_maps; i++) {
        struct vfio_region_info *reg = NULL;
        void *bar_addr;

        ret = pci_vfio_get_region_info(vfio_dev_fd, &reg, i);
// --snip--
        /* try mapping somewhere close to the end of hugepages */
        if (pci_map_addr == NULL)
            pci_map_addr = pci_find_max_end_va();

        bar_addr = pci_map_addr;
        pci_map_addr = RTE_PTR_ADD(bar_addr, (size_t) reg->size);

        pci_map_addr = RTE_PTR_ALIGN(pci_map_addr,
                    sysconf(_SC_PAGE_SIZE));

        maps[i].addr = bar_addr;
        maps[i].offset = reg->offset;
        maps[i].size = reg->size;
        maps[i].path = NULL; /* vfio doesn't have per-resource paths */

        ret = pci_vfio_mmap_bar(vfio_dev_fd, vfio_res, i, 0);
        if (ret < 0) {
            RTE_LOG(ERR, EAL, "%s mapping BAR%i failed: %s\n",
                    pci_addr, i, strerror(errno));
            free(reg);
            goto err_vfio_res;
        }

        dev->mem_resource[i].addr = maps[i].addr;

        free(reg);
    }
// --snip--

static int
pci_vfio_get_region_info(int vfio_dev_fd, struct vfio_region_info **info,
        int region)
{
    struct vfio_region_info *ri;
    size_t argsz = sizeof(*ri);
    int ret;

    ri = malloc(sizeof(*ri));
    if (ri == NULL) {
        RTE_LOG(ERR, EAL,
            "Cannot allocate memory for VFIO region info\n");
        return -1;
    }
again:
    memset(ri, 0, argsz);
    ri->argsz = argsz;
    ri->index = region;

    ret = ioctl(vfio_dev_fd, VFIO_DEVICE_GET_REGION_INFO, ri);
// --snip--
    *info = ri;

    return 0;
}

pci_vfio_mmap_bar()関数はvfio_res->maps[]配列に書き込まれているリージョンのオフセットとサイズでvfio_dev_fdディスクリプタをmmap(2)でメモリにマップします。

static int
pci_vfio_mmap_bar(int vfio_dev_fd, struct mapped_pci_resource *vfio_res,
        int bar_index, int additional_flags)
{
    struct memreg {
        uint64_t offset;
        size_t   size;
    } memreg[2] = {};
    void *bar_addr;
    struct pci_msix_table *msix_table = &vfio_res->msix_table;
    struct pci_map *bar = &vfio_res->maps[bar_index];
// --snip--
    /* reserve the address using an inaccessible mapping */
    bar_addr = mmap(bar->addr, bar->size, 0, MAP_PRIVATE |
            MAP_ANONYMOUS | additional_flags, -1, 0);
    if (bar_addr != MAP_FAILED) {
        void *map_addr = NULL;
        if (memreg[0].size) {
            /* actual map of first part */
            map_addr = pci_map_resource(bar_addr, vfio_dev_fd,
                            memreg[0].offset,
                            memreg[0].size,
                            RTE_MAP_FORCE_ADDRESS);
        }
// --snip--
    bar->addr = bar_addr;
    return 0;
}

void *
pci_map_resource(void *requested_addr, int fd, off_t offset, size_t size,
         int additional_flags)
{
    void *mapaddr;

    /* Map the PCI memory resource of device */
    mapaddr = rte_mem_map(requested_addr, size,
        RTE_PROT_READ | RTE_PROT_WRITE,
        RTE_MAP_SHARED | additional_flags, fd, offset);
// --snip--

void *
rte_mem_map(void *requested_addr, size_t size, int prot, int flags,
    int fd, uint64_t offset)
{
// --snip--
    return mem_map(requested_addr, size, sys_prot, sys_flags, fd, offset);
}

static void *
mem_map(void *requested_addr, size_t size, int prot, int flags,
    int fd, uint64_t offset)
{
    void *virt = mmap(requested_addr, size, prot, flags, fd, offset);

結局メモリマップドI/Oといっても特殊なことはしておらず、PCI I/Oメモリ空間をアクセスした時と同様にvfio_dev_fdディスクリプタを経由してリージョンにアクセスしているだけです。 PCI I/Oメモリ空間との違いはmmap(2)でメモリにマップしてから読み書きすることです。

8. 割り込みの取り扱い

リンクアップ/ダウンをQEMU上で実行するとvirtio_interrupt_handler()関数が実行されることがログからわかります。 この関数はrte_intr_callback_register()関数でDPDKの起動時にコールバックとしてstruct rte_intr_source_list intr_sourcesに登録します。

int
rte_intr_callback_register(const struct rte_intr_handle *intr_handle,
            rte_intr_callback_fn cb, void *cb_arg)
{
    int ret, wake_thread;
    struct rte_intr_source *src;
    struct rte_intr_callback *callback;
// --snip--
    callback->cb_fn = cb; // <= コールバック関数ポインタ
    callback->cb_arg = cb_arg;
    callback->pending_delete = 0;
    callback->ucb_fn = NULL;

    rte_spinlock_lock(&intr_lock);

    /* check if there is at least one callback registered for the fd */
    TAILQ_FOREACH(src, &intr_sources, next) {
        if (rte_intr_fd_get(src->intr_handle) == rte_intr_fd_get(intr_handle)) {
            /* we had no interrupts for this */
            if (TAILQ_EMPTY(&src->callbacks))
                wake_thread = 1;

            TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
            ret = 0;
            break;
        }
    }

以下のように、専用のスレッドでepoll待ち合わせを抜けたら、このリストに登録されたコールバックを実行します。

static __rte_noreturn void *
eal_intr_thread_main(__rte_unused void *arg)
{
    /* host thread, never break out */
    for (;;) {
        /* build up the epoll fd with all descriptors we are to
        * wait on then pass it to the handle_interrupts function
        */
        static struct epoll_event pipe_event = {
            .events = EPOLLIN | EPOLLPRI,
        };
        struct rte_intr_source *src;
        unsigned numfds = 0;

        /* create epoll fd */
        int pfd = epoll_create(1);
// --snip--
        rte_spinlock_lock(&intr_lock);

        TAILQ_FOREACH(src, &intr_sources, next) {
            struct epoll_event ev;

            if (src->callbacks.tqh_first == NULL)
                continue; /* skip those with no callbacks */
            memset(&ev, 0, sizeof(ev));
            ev.events = EPOLLIN | EPOLLPRI | EPOLLRDHUP | EPOLLHUP;
            ev.data.fd = rte_intr_fd_get(src->intr_handle); // <= 割り込みファイルディスクリプタ取得

            /**
            * add all the uio device file descriptor
            * into wait list.
            */
            if (epoll_ctl(pfd, EPOLL_CTL_ADD,
                    rte_intr_fd_get(src->intr_handle), &ev) < 0) {
                rte_panic("Error adding fd %d epoll_ctl, %s\n",
                    rte_intr_fd_get(src->intr_handle),
                    strerror(errno));
            }
            else
                numfds++;
        }
        rte_spinlock_unlock(&intr_lock);
        /* serve the interrupt */
        eal_intr_handle_interrupts(pfd, numfds);
// --snip--

static void
eal_intr_handle_interrupts(int pfd, unsigned totalfds)
{
    struct epoll_event events[totalfds];
    int nfds = 0;

    for(;;) {
        nfds = epoll_wait(pfd, events, totalfds,
            EAL_INTR_EPOLL_WAIT_FOREVER); // <= 指定したファイフディスクリプタでepoll待ち合わせ
// --snip--
        /* epoll_wait has at least one fd ready to read */
        if (eal_intr_process_interrupts(events, nfds) < 0)
// --snip--

static int
eal_intr_process_interrupts(struct epoll_event *events, int nfds)
{
// --snip--
    for (n = 0; n < nfds; n++) {
// --snip--
        rte_spinlock_lock(&intr_lock);
        TAILQ_FOREACH(src, &intr_sources, next)
            if (rte_intr_fd_get(src->intr_handle) == events[n].data.fd)
                break;
// --snip--
        if (bytes_read > 0) {
            /**
            * read out to clear the ready-to-be-read flag
            * for epoll_wait.
            */
            bytes_read = read(events[n].data.fd, &buf, bytes_read);
            if (bytes_read < 0) {
// --snip--
            } else if (bytes_read == 0)
                RTE_LOG(ERR, EAL, "Read nothing from file "
                    "descriptor %d\n", events[n].data.fd);
            else
                call = true;
        }

        /* grab a lock, again to call callbacks and update status. */
        rte_spinlock_lock(&intr_lock);

        if (call) {

            /* Finally, call all callbacks. */
            TAILQ_FOREACH(cb, &src->callbacks, next) {

                /* make a copy and unlock. */
                active_cb = *cb;
                rte_spinlock_unlock(&intr_lock);

                /* call the actual callback */
                active_cb.cb_fn(active_cb.cb_arg); // <= 登録されているコールバックを呼び出す

                /*get the lock back. */
                rte_spinlock_lock(&intr_lock);
            }

rte_intr_fd_get(src->intr_handle)で取得して得られるファイルディスクリプタはpci_vfio_setup_interrupts()関数で設定しています。

static int
pci_vfio_setup_interrupts(struct rte_pci_device *dev, int vfio_dev_fd)
{
// --snip--
        /* set up an eventfd for interrupts */
        fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
        if (fd < 0) {
            RTE_LOG(ERR, EAL, "Cannot set up eventfd, error "
                    "%i (%s)\n", errno, strerror(errno));
            return -1;
        }

        if (rte_intr_fd_set(dev->intr_handle, fd))
            return -1;

このeventfdはvfio_enable_msix()関数でVFIOドライバに渡しています。

static int
vfio_enable_msix(const struct rte_intr_handle *intr_handle) {
    int len, ret;
    char irq_set_buf[MSIX_IRQ_SET_BUF_LEN];
    struct vfio_irq_set *irq_set;
    int *fd_ptr, vfio_dev_fd, i;

    len = sizeof(irq_set_buf);

    irq_set = (struct vfio_irq_set *) irq_set_buf;
    irq_set->argsz = len;
    /* 0 < irq_set->count < RTE_MAX_RXTX_INTR_VEC_ID + 1 */
    irq_set->count = rte_intr_max_intr_get(intr_handle) ?
        (rte_intr_max_intr_get(intr_handle) >
         RTE_MAX_RXTX_INTR_VEC_ID + 1 ?    RTE_MAX_RXTX_INTR_VEC_ID + 1 :
         rte_intr_max_intr_get(intr_handle)) : 1;

    irq_set->flags = VFIO_IRQ_SET_DATA_EVENTFD | VFIO_IRQ_SET_ACTION_TRIGGER;
    irq_set->index = VFIO_PCI_MSIX_IRQ_INDEX;
    irq_set->start = 0;
    fd_ptr = (int *) &irq_set->data;
    /* INTR vector offset 0 reserve for non-efds mapping */
    fd_ptr[RTE_INTR_VEC_ZERO_OFFSET] = rte_intr_fd_get(intr_handle);
    for (i = 0; i < rte_intr_nb_efd_get(intr_handle); i++) {
        fd_ptr[RTE_INTR_VEC_RXTX_OFFSET + i] =
            rte_intr_efds_index_get(intr_handle, i);
    }

    vfio_dev_fd = rte_intr_dev_fd_get(intr_handle);
    ret = ioctl(vfio_dev_fd, VFIO_DEVICE_SET_IRQS, irq_set);

VFIOドライバが割り込みハンドラでeventfd_signal(9)を呼び出すことで、先述のepollが起床します。

9. 今回取り上げなかったこと

今回の記事ではDPDKのユーザ空間と、Linuxのカーネル空間の境界を知るためにPCIデバイスがどうDPDKに見えているか深掘りしました。 しかし今回の記事では都合上、以下については扱いませんでした。

  • PMDによるパケット送受信の詳しい仕組み
  • vfio-pciドライバと実際のPCIデバイスの責任分担

これらは機会があれば、次回以降の記事で明らかにしたいと思います。

10. まとめ

本記事ではDPDKのソースコードを読みながらPMDとPCIデバイスの関係を調べました。 ソースコードを読むまではPMDがどう実現されているのかはっきりとは理解できませんでしたが、調査の結果以下で実現されていることがわかりました。

  • epoll_wait(2)
  • eventfd(2)
  • ioctl(2)
  • mmap(2)
  • pthread
  • sysfs

これらは特殊な仕組みではなく、Linuxではありふれた技術です。 「ユーザ空間ドライバを作る」といったこれまでの枠組みと離れた機能を実現しようとする際もまずは既存の仕組みが使えないか考えるべきだと言えるでしょう。

一方で今回の調査では割り込みハンドラが実行されるまでは比較的時間がかかりそうに読めました。 リンクアップ/ダウンを伝える程度の割り込みであればこのPMDの実装で良いかもしれませんが、割り込み応答がパフォーマンスに影響を大きく与える場合には別の方法を検討した方が良さそうです。