간결한 두 편이면 될 줄 알았던 VPP 시리즈가 길고 긴 세 편이 됐다. 마지막 글은 잡동사니로 채워야 제맛.

RPC 요청 수신 알림

지난 포스트 말미에서 RPC 메시지 전달을 다루면서 수신 프로세스 노드 동작을 대충 뭉개고 넘어갔다. 어떻게 해야 CPU를 낭비하지 않으면서 지연을 최소화 할 수 있을까? VPP의 답은 단순 폴링이다.

vpp/src/vlib/main.c:

static_always_inline void
vlib_main_or_worker_loop (...)
{
  ...

  while (1)
    {
      ...
      if (PREDICT_TRUE (is_main && vm->queue_signal_pending == 0))
        vm->queue_signal_callback (vm);
      ...
    }
}

vpp/src/vlibmemory/memory_vlib.c:

static uword
memclnt_process (...)
{
  ...
  vlib_set_queue_signal_callback (vm, memclnt_queue_callback);
  ...

  while (1)
    {
      ...

      vlib_process_wait_for_event_or_clock (vm, sleep_time);
      vec_reset_length (event_data);
      event_type = vlib_process_get_events (vm, &event_data);
      now = vlib_time_now (vm);

      switch (event_type)
        {
        case QUEUE_SIGNAL_EVENT:
          vm->queue_signal_pending = 0;
          break;
        ...
        }
      ...
    }

  return 0;
}

static void
memclnt_queue_callback (vlib_main_t * vm)
{
  ...

  for (i = 0; i < vec_len (vl_api_queue_cursizes); i++)
    {
      if (*vl_api_queue_cursizes[i])
        {
          vm->queue_signal_pending = 1;
          vm->api_queue_nonempty = 1;
          vlib_process_signal_event (vm, memclnt_node.index,
                                     /* event_type */ QUEUE_SIGNAL_EVENT,
                                     /* event_data */ 0);
          break;
        }
    }
}

즉 메인 루프에서 RPC에 쓰는 공유 메모리 큐 길이를 반복해서 확인하는 것이다. 큐에 메시지가 있으면 memclnt_process() 노드에게 QUEUE_SIGNAL_EVENT를 보내고, 그러면 잠들어 있던 노드가 깨어나서 큐에 있는 RPC 메시지를 처리한다. (관련 소스 코드 여기저기에 “api”라는 단어가 등장한다. 사실 공유 메모리 기반 큐의 주된 용도는 스레드 간 RPC가 아니라 프로그램 외부에 메시지 기반 API를 제공하는 것이다. API는 잠시 후에 다시 잠깐 등장한다.)

이렇게 루프를 돌 때마다 한 번씩 실행되는 루틴들이 몇 가지 있다. 이런 루틴들이 CPU를 야금야금 잡아먹어서 패킷 처리 용량을 떨어뜨리지 않을까 걱정될 수도 있다. 하지만 트래픽 부하가 증가하면 한번에 처리하는 패킷 수가 늘기 때문에 한 루프에서 패킷 처리에 더 많은 CPU 시간을 쓰는 반면 그런 루틴들에는 비슷한 양의 CPU를 쓰게 된다. 즉, 바쁠수록 중요한 일에 쓰는 CPU 시간 비율이 높아진다.

파일 폴링

네트워크 응용 중에는 소켓으로 통신하거나 파일에 접근해야 하는 것들이 있다. 가령 유닉스 소켓을 통해 로컬의 필터링 서버와 통신해야 할 수도 있고, 캐시 파일에서 읽은 데이터를 패킷에 실어 보내야 할 수도 있다. 그런 동작을 블로킹 모드로 수행할 수는 없으니 파일 디스크립터를 poll() 할 수 있는 방법이 필요하다.

여기에도 기반은 반복 실행이다. 메인 스레드가 루프에서 VLIB_NODE_TYPE_PRE_INPUT 타입 노드를 가장 먼저 실행한다.

vpp/src/vlib/main.c:

static_always_inline void
vlib_main_or_worker_loop (...)
{
  ...
  while (1)
    {
      ...
      /* Process pre-input nodes. */
      if (is_main)
        vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
          cpu_time_now = dispatch_node (vm, n,
                                        VLIB_NODE_TYPE_PRE_INPUT,
                                        VLIB_NODE_STATE_POLLING,
                                        /* frame */ 0,
                                        cpu_time_now);
      ...
    }
}

vpp/src/vlib/unix/input.c:

static uword
linux_epoll_input (...)
{
  ...
  linux_epoll_main_t *em = &linux_epoll_main;
  struct epoll_event *e;
  int n_fds_ready;

  ...

  {
    ...
    int timeout_ms = 0, max_timeout_ms = 10;
    f64 vector_rate = vlib_last_vectors_per_main_loop (vm);

    /* If we're not working very hard, decide how long to sleep */
    if (vector_rate < 2 && vm->api_queue_nonempty == 0
        && nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
      {
        ...
        timeout_ms = ...;
        node->input_main_loops_per_call = 0;
      }
    else                        /* busy */
      {
        /* Don't come back for a respectable number of dispatch cycles */
        node->input_main_loops_per_call = 1024;
      }

    /* Allow any signal to wakeup our sleep. */
    {
      static sigset_t unblock_all_signals;
      n_fds_ready = epoll_pwait (em->epoll_fd,
                                 em->epoll_events,
                                 vec_len (em->epoll_events),
                                 timeout_ms, &unblock_all_signals);

      /* This kludge is necessary to run over absurdly old kernels */
      if (n_fds_ready < 0 && errno == ENOSYS)
        {
          n_fds_ready = epoll_wait (em->epoll_fd,
                                    em->epoll_events,
                                    vec_len (em->epoll_events), timeout_ms);
        }
    }
  }
  ...

  for (e = em->epoll_events; e < em->epoll_events + n_fds_ready; e++)
    {
      u32 i = e->data.u32;
      clib_file_t *f = poll_elt_at_index (fm->file_pool, i);
      clib_error_t *errors[4];
      int n_errors = 0;

      if (PREDICT_TRUE (!(e->events & EPOLLERR)))
        {
          if (e->events & EPOLLIN)
            {
              errors[n_errors] = f->read_function (f);
              n_errors += errors[n_errors] != 0;
            }
          if (e->events & EPOLLOUT)
            {
              errors[n_errors] = f->write_function (f);
              n_errors += errors[n_errors] != 0;
            }
        }
      else
        {
          if (f->error_function)
            {
              errors[n_errors] = f->error_function (f);
              n_errors += errors[n_errors] != 0;
            }
          else
            close (f->file_descriptor);
        }
      ...
    }

  return 0;
}

VLIB_REGISTER_NODE (linux_epoll_input_node,static) = {
  .function = linux_epoll_input,
  .type = VLIB_NODE_TYPE_PRE_INPUT,
  .name = "unix-epoll-input",
};

다음과 같이 콜백을 등록해 두면 이벤트 발생 시 호출된다. 콜백 내에서 작업을 직접 수행해도 되고 프로세스 노드로 알림을 보내도 된다.

vpp/src/vnet/devices/netmap/netmap.c:

int
netmap_create_if (...)
{
  netmap_main_t *nm = &netmap_main;
  ...

  fd = open ("/dev/netmap", O_RDWR);
  if (fd < 0)
    return VNET_API_ERROR_SUBIF_ALREADY_EXISTS;

  pool_get (nm->interfaces, nif);
  nif->if_index = nif - nm->interfaces;
  nif->fd = fd;
  ...

  {
    clib_file_t template = { 0 };
    template.read_function = netmap_fd_read_ready;
    template.file_descriptor = nif->fd;
    template.private_data = nif->if_index;
    nif->clib_file_index = clib_file_add (&file_main, &template);
  }
  ...
}

그나저나 Netmap 모듈에서 왜 /dev/netmap 파일의 읽기 가능 여부를 확인할까?

인터럽트 모드

이전 포스트에서 언급한 것처럼 DPDK를 폴링 모드 대신 인터럽트 모드로 사용할 수 있다. 도착 패킷 처리 시작까지의 지연이 늘기는 하지만 트래픽 부하가 낮을 때 CPU 사이클을 무의미하게 날리는 걸 피할 수 있다. VPP에서도 비슷한 동작이 가능할까?

인터럽트 모드 동작을 위해선 당연하게도 인터럽트 알림을 받을 방법이 있어야 한다. DPDK와 Netmap 모두 어떤 장치 파일을 통해 알림을 받는다. DPDK에서는 UIO 파일 디스크립터이고 Netmap에서는 /dev/netmap 파일이다. 그 파일에서 읽기가 가능하면 패킷이 들어온 것이다.

vpp/src/vnet/devices/netmap/netmap.c:

static clib_error_t *
netmap_fd_read_ready (clib_file_t * uf)
{
  ...

  /* Schedule the rx node */
  vlib_node_set_interrupt_pending (vm, netmap_input_node.index);

  return 0;
}

vpp/src/vlib/node_funcs.h:

always_inline void
vlib_node_set_interrupt_pending (vlib_main_t * vm, u32 node_index)
{
  vlib_node_main_t *nm = &vm->node_main;
  vlib_node_t *n = vec_elt (nm->nodes, node_index);
  ASSERT (n->type == VLIB_NODE_TYPE_INPUT);
  clib_spinlock_lock_if_init (&nm->pending_interrupt_lock);
  vec_add1 (nm->pending_interrupt_node_runtime_indices, n->runtime_index);
  clib_spinlock_unlock_if_init (&nm->pending_interrupt_lock);
}

미처리 인터럽트 있는 노드 목록(nm->pending_interrupt_node_runtime_indices)에 입력 노드를 추가한다. 근데 스핀락이 등장한다. VPP에서는 공유 데이터 접근 제어 오버헤드를 피하기 위해 가능하면 스레드별로 데이터 사본을 만들어 사용한다. 그래서 코드 전체에서도 스핀락을 쓰는 곳이 얼마 되지 않는데, 여기가 그 중 하나이다. 하지만 위 코드에서는 현재 실행 스레드, 즉 메인 스레드에만 노드를 스케줄 하기 때문에 사실 락이 필요치 않다. 만약 전체 스레드에게 인터럽트를 알리고 싶다면 모든 vm에 대해 vlib_node_set_interrupt_pending()을 호출하면 되는데, 그럴 때 락이 필요하다. 목록을 확인하는 쪽에서도 락을 쓴다.

static_always_inline void
vlib_main_or_worker_loop (...)
{
  ...
  u32 *last_node_runtime_indices = 0;

  ...
  while (1)
    {
      ...

      /* Next handle interrupts. */
      {
        uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
        uword i;
        if (l > 0)
          {
            u32 *tmp;
            if (!is_main)
              clib_spinlock_lock (&nm->pending_interrupt_lock);
            tmp = nm->pending_interrupt_node_runtime_indies;
            nm->pending_interrupt_node_runtime_indices =
              last_node_runtime_indices;
            last_node_runtime_indices = tmp;
            _vec_len (last_node_runtime_indices) = 0;
            if (!is_main)
              clib_spinlock_unlock (&nm->pending_interrupt_lock);
            for (i = 0; i < l; i++)
              {
                n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
                                      last_node_runtime_indices[i]);
                cpu_time_now =
                  dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
                                 VLIB_NODE_STATE_INTERRUPT,
                                 /* frame */ 0,
                                 cpu_time_now);
              }
          }
      }

      ...
    }
}

얼른 목록 포인터를 바꿔치기한 다음 락을 놓고 느긋하게 각 노드를 호출한다. 노드 함수를 실행한 후에 추정 부하에 따라 폴링 모드(VLIB_NODE_STATE_POLLING)와 인터럽트 모드(VLIB_NODE_STATE_INTERRUPT) 사이를 전환한다.

vpp/src/vlib/main.c:

static_always_inline u64
dispatch_node (vlib_main_t * vm,
               vlib_node_runtime_t * node,
               vlib_node_type_t type,
               vlib_node_state_t dispatch_state,
               vlib_frame_t * frame, u64 last_time_stamp)
{
  uword u, v;
  ...

  n = node->function (vm, node, frame);
  ...

  v = vlib_node_runtime_update_stats (...);

  /* When in interrupt mode and vector rate crosses threshold switch to
     polling mode. */
  if ((dispatch_state == VLIB_NODE_STATE_INTERRUPT)
      || (dispatch_state == VLIB_NODE_STATE_POLLING
          && (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE)))
    {
      if ((dispatch_state == VLIB_NODE_STATE_INTERRUPT
           && v >= nm->polling_threshold_vector_length) &&
          !(node->flags & VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE))
        {
          vlib_node_t *n = vlib_get_node (vm, node->node_index);
          n->state = VLIB_NODE_STATE_POLLING;
          node->state = VLIB_NODE_STATE_POLLING;
          node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
          node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
          ...
        }
      else if (dispatch_state == VLIB_NODE_STATE_POLLING
               && v <= nm->interrupt_threshold_vector_length)
        {
          vlib_node_t *n = vlib_get_node (vm, node->node_index);
          if (node->flags & VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE)
            {
              /* Switch to interrupt mode after dispatch in polling one more time.
                 This allows driver to re-enable interrupts. */
              n->state = VLIB_NODE_STATE_INTERRUPT;
              node->state = VLIB_NODE_STATE_INTERRUPT;
              node->flags &= ~VLIB_NODE_FLAG_SWITCH_FROM_INTERRUPT_TO_POLLING_MODE;
              ...
            }
          else
            {
              node->flags |= VLIB_NODE_FLAG_SWITCH_FROM_POLLING_TO_INTERRUPT_MODE;
            }
        }
    }

  return t;
}

분명 인터럽트 모드를 지원하고 작업 부하에 따라 자동으로 모드 전환도 한다. 그럼에도 불구하고 VPP 기반 프로그램은 수신 패킷이 없을 때에도 열심히 CPU를 소모한다. 그럴 수밖에 없는 것이, 들어오는 패킷이 없어도 프로세스 노드 실행과 타이머 처리를 해야 하기 때문이다. timerfd를 이용해 다음 타이머 실행 시점까지 잠들어 있는 걸 고려할 수도 있겠지만, 그러면 공유 메모리 기반 통신에 예측 불가능한 지연이 생긴다. 그 통신 메커니즘을 파일 디스크립터 기반으로 바꿀 수 있을까? 오버헤드를 감수한다면 비교적 쉽게 가능할 수도 있다. 하지만 VPP는 타협 없이 성능을 최대화 하는 쪽을 택했다. CPU 시간이 줄지도 않는데 VPP에서 인터럽트 모드의 의미가 뭐냐고 한다면 다른 노드에게 더 많은 실행 기회를 주는 것이다.

배리어

메인 루프를 거의 다 살펴봤다. 남은 건 다음 세 줄이다.

vpp/src/vlib/main.c:

static_always_inline void
vlib_main_or_worker_loop (...)
{
  ...

  while (1)
    {
      vlib_node_runtime_t *n;

      if (!is_main)
        {
          vlib_worker_thread_barrier_check ();
          vec_foreach (fqm, tm->frame_queue_mains);
            vlib_frame_queue_dequeue (vm, fqm);
        }

      ...
    }
}

아래 두 줄의 frame_queue는 스레드들 사이에서 프레임(노드, 인자 패킷 목록)을 전달하기 위한 메커니즘이다. 예를 들면 동작 중에 작업 스레드 하나를 내리면서 남은 일거리를 다른 스레드에게 넘길 수 있을 것이다. 또는 메인 스레드에 과부하가 걸릴 때 패킷 처리 작업 일부를 다른 스레드에게 나눠 줄 수도 있을 것이다. 하지만 현재 이 메커니즘을 쓰는 곳은 없다.

이제 한 줄 남았다.

vpp/src/vlib/threads.h:

static inline void
vlib_worker_thread_barrier_check (void)
{
  if (PREDICT_FALSE (*vlib_worker_threads->wait_at_barrier))
    {
      vlib_main_t *vm;
      clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, 1);
      ...
      while (*vlib_worker_threads->wait_at_barrier)
        ;
      ...
      clib_smp_atomic_add (vlib_worker_threads->workers_at_barrier, -1);

      ...
    }
}

즉 메인 스레드에서 *vlib->worker_threads_wait_at_barrier에 0 아닌 값을 할당해서 모든 작업 스레드를 일시 중지 시킬 수 있다. 그 상태에서 접근 경쟁에 신경 쓰지 않고 원하는 작업을 할 수 있다. 가령 플러그인 노드를 추가하거나 뺄 수도 있을 테고 노드의 동작 설정을 바꿀 수도 있을 것이다. 또 프로그램 시작 때 모든 준비가 끝날 때까지 작업 스레드들을 멈춰두는 데도 사용한다. 즉 pthread_barrier_wait()의 busy waiting 버전이다.

코드 조각들

배리어 얘기가 나왔으니 (별 관련이 없는) 메모리 배리어를 살펴보지 않을 수 없다. 그러는 김에 근처의 다른 코드들도 그냥 지나칠 수 없다.

vpp/src/vppinfra/clib.h:

/* Hints to compiler about hot/cold code. */
#define PREDICT_FALSE(x) __builtin_expect((x),0)
#define PREDICT_TRUE(x) __builtin_expect((x),1)

/* Full memory barrier (read and write). */
#define CLIB_MEMORY_BARRIER() __sync_synchronize ()

#if __x86_64__
#define CLIB_MEMORY_STORE_BARRIER() __builtin_ia32_sfense ()
#else
#define CLIB_MEMORY_STORE_BARRIER() __sync_synchronize ()
#endif

...

always_inline uword
first_set (uword x)
{
  return x & -x;
}

PREDICT_TRUE()/PREDICT_FALSE()는 리눅스 커널 소스의 likely()/unlikely()와 동일하다. VPP에서는 메모리 배리어도 컴파일러에게 맡기고 있는데, 결국 mfense 류의 인스트럭션이다. __sync_synchronize()이제 구식인 모양이고 최신 API로는 __atomic_thread_fence(__ATOMIC_SEQ_CST) 정도인 듯하다.

first_set()ffs() 함수랑 비슷하면서 살짝 다르다. 최하위 설정 비트의 위치가 아니라 그 비트 값을 반환한다. 가령 비트맵에 설정된 각 비트를 순회할 때 사용할 수 있다.

또 눈에 띄는 코드가 있다.

vpp/src/vlib/node.h:

#ifdef CLIB_UNIX
  /* Ensure that the stack is aligned on the multiple of the page size */
typedef char
  assert_process_stack_must_be_aligned_exactly_to_page_size_multiple[(sizeof
                                                                      (vlib_process_t)
                                                                      -
                                                                      PAGE_SIZE_MULTIPLE)
                                                                     ==
                                                                     0 ? 0 :
                                                                     -1];
#endif

컴파일 타임 assertion인데, 리눅스 커널 방식이 훨씬 깔끔하다.

linux/include/linux/bug.h:

#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))

CLI, API

잠시 쉬었으니 다시 작업 스레드 일시 정지 시키는 얘기로 돌아가자. 작업 스레드를 멈추는 이유들 중에 노드 설정 변경이 있다. 그렇다는 것은 프로그램 동작 중에 새 설정을 적용하는 메커니즘이 있다는 얘기다. 일부 데몬들의 관행을 따라 SIGHUP을 이용할 수도 있겠지만, VPP는 네트워크 장치를 위한 프레임워크 답게 CLI를 제공한다.

CLI 구현에는 지금까지 등장한 메커니즘들로 충분하다. 소켓을 열어서 파일 콜백을 등록해 두고, 신규 연결이 들어오면 새 프로세스 노드를 만들어서 등록하고, 새 파일 디스크립터로 데이터가 들어오면 그 노드에게 이벤트를 보내고, 그러면 그 노드에서 명령을 읽어서 처리하고 응답을 보낸다. (vpp/src/vlib/unix/cli.c)

히스토리 기능에 간단한 자동 완성까지 지원하고 연결 시에 무려 컬러 배너도 보여 주지만 사용자 인증 기능은 없다. 사용자가 로컬에서 vppctl이나 telnet으로 연결해서 사용하거나 로컬에서 도는 에이전트 프로그램이 명령을 보내는 식이다.

600개 정도의 명령이 있는데, 다음과 같은 식으로 명령을 정의한다.

vpp/src/vlibmemory/memory_vlib.c:

static clib_error_t *
vl_api_client_command (vlib_main_t * vm,
                       unformat_input_t * input, vlib_cli_command_t * cli_cmd)
{
  vl_api_registration_t **regpp, *regp;
  unix_shared_memory_queue_t *q;
  char *health;
  api_main_t *am = &api_main;
  u32 *confused_indices = 0;

  if (!pool_elts (am->vl_clients))
    goto socket_clients;
  vlib_cli_output (vm, "Shared memory clients");
  vlib_cli_output (vm, "%16s %8s %14s %18s %s",
                   "Name", "PID", "Queue Length", "Queue VA", "Health");

  /* *INDENT-OFF* */
  pool_foreach (regpp, am->vl_clients,
  ({
    regp = *regpp;

    if (regp)
      {
        if (regp->unanswered_pings > 0)
          health = "questionable";
        else
          health = "OK";

        q = regp->vl_input_queue;

        vlib_cli_output (vm, "%16s %8d %14d 0x%016llx %s\n",
                         regp->name, q->consumer_pid, q->cursize,
                         q, health);
      }
    else
      {
        ...
      }
  }));
  /* *INDENT-ON* */

  ...
}

VLIB_CLI_COMMAND (cli_show_api_clients_command, static) =
{
  .path = "show api clients",
  .short_help = "Client information",
  .function = vl_api_client_command,
};

위 명령은 공유 메모리 기반 API의 클라이언트들을 나열한다. RPC 때 등장했던 그 공유 메모리이다. API의 주요 클라이언트는 (Honeycomb의 뒤를 이은) hc2vpp인데, 이 에이전트가 VPP를 Opendaylight 컨트롤러로 연결해 준다. VPP 소스 곳곳에 API를 명세하는 파일(*.api)과 API 구현 코드(*_api.c)가 있다.

패킷 출력

그래프 포스트에서 두리뭉실 넘어간 것 중 하나가 출력 노드이다. 특별한 건 없다. 다음과 같이 출력 장치를 정의하고, 모듈 초기화 과정에서 각 인터페이스에 대응하는 노드를 만든다.

vpp/src/plugins/dpdk/devices/device.c:

static_always_inline
  u32 tx_burst_vector_internal (vlib_main_t * vm,
                                dpdk_device_t * xd,
                                struct rte_mbuf **tx_vector)
{
  ...
  do
    {
      ...
      rv = rte_eth_tx_burst (xd->device_index,
                             (uint16_t) queue_id,
                             &tx_vector[tx_tail],
                             (uint16_t) (tx_head - tx_tail));
      ...
    }
  while (rv && n_packets && (n_retry > 0));

  return n_packets;
}

uword
CLIB_MULTIARCH_FN (dpdk_interface_tx) (vlib_main_t * vm,
                                       vlib_node_runtime_t * node,
                                       vlib_frame_t * f)
{
  ...
  n_packets = tx_burst_vector_internal (vm, xd, tx_vector);
  ...
}

VNET_DEVICE_CLASS (dpdk_device_class) = {
  .name = "dpdk",
  .tx_function = dpdk_interface_tx,
  ...
};

vpp/src/vnet/interface.c:

u32
vnet_register_interface (vnet_main_t * vnm,
                         u32 dev_class_index,
                         u32 dev_instance,
                         u32 hw_class_index, u32 hw_instance)
{
  ...
  else
    {
      vlib_node_registration_t r;
      ...

      memset (&r, 0, sizeof (r));
      r.type = VLIB_NODE_TYPE_INTERNAL;
      ...
      r.flags = VLIB_NODE_FLAG_IS_OUTPUT;
      r.name = tx_node_name;
      r.function = dev_class->tx_function;

      hw->tx_node_index = vlib_register_node (vm, &r);
      ...
    }
  ...
}

패킷의 종착지로 네트워크 장치만 있는 건 아니다. 오류가 있거나 차단해야 할 패킷이면 버려야 하고, 컨트롤 플레인에서 처리할 패킷이면 그리 보내야 한다. 이 역시 노드이다.

vpp/src/vnet/interface_output.c:

static_always_inline uword
process_drop_punt(vlib_main_t * vm,
                  vlib_node_runtime_t * node,
                  vlib_frame_t * frame, vnet_error_disposition_t disposition)
{
  ...

  if (disposition == VNET_ERROR_DISPOSITION_DROP || !vm->os_punt_frame)
    {
      vlib_buffer_free (vm, first_buffer, frame->n_vectors);

      /* If there is no punt function, free the frame as well. */
      if (disposition == VNET_ERROR_DISPOSITION_PUNT && !vm->os_punt_frame)
        vlib_frame_free (vm, node, frame);
    }
  else
    vm->os_punt_frame (vm, node, frame);

  return frame->n_vectors;
}

static uword
process_drop (...)
{
  ...
  return process_drop_punt (vm, node, frame, VNET_ERROR_DISPOSITION_DROP);
}

static uword
process_punt (...)
{
  return process_drop_punt (vm, node, frame, VNET_ERROR_DISPOSITION_PUNT);
}

VLIB_REGISTER_NODE (drop_buffers,static) = {
  .function = process_drop,
  .name = "error-drop",
  .flags = VLIB_NODE_FLAG_IS_DROP,
  ...
};

VLIB_REGISTER_NODE (punt_buffers,static) = {
  .function = process_punt,
  .flags = (VLIB_NODE_FLAG_FRAME_NO_FREE_AFTER_DISPATCH
            | VLIB_NODE_FLAG_IS_PUNT),
  .name = "error-punt",
  ...
};

패킷을 컨트롤 플레인이나 다른 호스트 쪽으로 뻥 차는(punt) 동작은 vm->os_punt_frame에서 구현한다. 가령 TUN/TAP 장치를 통해 기반 운영체제로 보낼 수 있다.

vpp/src/vnet/unix/tuntap.c:

static clib_error_t *
tuntap_config (...)
{
  ...
  vm->os_punt_frame = tuntap_punt_frame;
  ...
}

static void
tuntap_punt_frame (...)
{
  tuntap_tx (vm, node, frame);
  vlib_frame_free (vm, node, frame);
}

static uword
tuntap_tx (...)
{
  ...
  for (i = 0; i < n_packets; i++)
    {
      ...
      if (writev (tm->dev_net_tun_fd, tm->threads[thread_index].iovecs,
                  vec_len (tm->threads[thread_index].iovecs)) < l)
        clib_unix_warning ("writev");

      n_bytes += l;
    }
  ...
}

패킷의 종착지까지 설명했으니…

진짜 끝.

VPP는 큰 프레임워크다. 지금까지 살펴본 것 외에도 패킷 처리 경로를 바꿀 수 있는 ‘Feature’, 플러그인 구조, 기본 플러그인들 중 하나인 NAT64 모듈, 세션 관리 구조, 스트림 프로토콜 흐름 제어, 암호 연산 하드웨어 지원, C/C++/Java/Lua/Python API, 디버깅 지원 등, 많은 서브시스템과 애스팩트들을 담고 있다. 오랜 시간 동안 많은 고민과 노력으로 만들어진 결과물이다.

간단한 네트워크 응용을 효율적으로 만들려고 할 때는 DPDK 정도로 충분하다. 그보다 복잡한 걸 만들 때 VPP가 등장할 수 있는데 큰 프레임워크이니만큼 진입 장벽이 높다. 일반적인 커널 네트워크 스택과 구조가 많이 다르기에 기존 응용을 포팅 하는 건 삽질이다. 하지만 뭔가를 새로 만들 때라면 커널을 건드리는 것의 대안으로 선택할 만하다.

(일부 개발자에게 첫 번째 진입 장벽은 GNU 스타일과 비슷한 그 코딩 스타일일 수 있다. 시스코의 다른 코드를 보면 공식 스타일이거나 한 건 아닌 모양인데…)

그간의 알고리즘 개선, 코드 최적화, 오버헤드 제거에 이어 VPP는 캐시 활용률 개선을 통해 최대 성능을 높인다. 또 다른 개선 방법이 가능할까? 한편으로, 벡터 처리 방식을 다른 응용에 도입해 성능을 올릴 수 있을까?