summaryrefslogtreecommitdiff
path: root/kernel/bpf/ringbuf.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/bpf/ringbuf.c')
-rw-r--r--kernel/bpf/ringbuf.c114
1 files changed, 95 insertions, 19 deletions
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index d706c4b7f532..f6a075ffac63 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -13,7 +13,7 @@
#include <linux/btf_ids.h>
#include <asm/rqspinlock.h>
-#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
+#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE)
/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
#define RINGBUF_PGOFF \
@@ -30,6 +30,7 @@ struct bpf_ringbuf {
u64 mask;
struct page **pages;
int nr_pages;
+ bool overwrite_mode;
rqspinlock_t spinlock ____cacheline_aligned_in_smp;
/* For user-space producer ring buffers, an atomic_t busy bit is used
* to synchronize access to the ring buffers in the kernel, rather than
@@ -73,6 +74,7 @@ struct bpf_ringbuf {
unsigned long consumer_pos __aligned(PAGE_SIZE);
unsigned long producer_pos __aligned(PAGE_SIZE);
unsigned long pending_pos;
+ unsigned long overwrite_pos; /* position after the last overwritten record */
char data[] __aligned(PAGE_SIZE);
};
@@ -166,7 +168,7 @@ static void bpf_ringbuf_notify(struct irq_work *work)
* considering that the maximum value of data_sz is (4GB - 1), there
* will be no overflow, so just note the size limit in the comments.
*/
-static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
+static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node, bool overwrite_mode)
{
struct bpf_ringbuf *rb;
@@ -183,17 +185,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
rb->consumer_pos = 0;
rb->producer_pos = 0;
rb->pending_pos = 0;
+ rb->overwrite_mode = overwrite_mode;
return rb;
}
static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
{
+ bool overwrite_mode = false;
struct bpf_ringbuf_map *rb_map;
if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
return ERR_PTR(-EINVAL);
+ if (attr->map_flags & BPF_F_RB_OVERWRITE) {
+ if (attr->map_type != BPF_MAP_TYPE_RINGBUF)
+ return ERR_PTR(-EINVAL);
+ overwrite_mode = true;
+ }
+
if (attr->key_size || attr->value_size ||
!is_power_of_2(attr->max_entries) ||
!PAGE_ALIGNED(attr->max_entries))
@@ -205,7 +215,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
bpf_map_init_from_attr(&rb_map->map, attr);
- rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
+ rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node, overwrite_mode);
if (!rb_map->rb) {
bpf_map_area_free(rb_map);
return ERR_PTR(-ENOMEM);
@@ -295,13 +305,26 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
}
+/*
+ * Return an estimate of the available data in the ring buffer.
+ * Note: the returned value can exceed the actual ring buffer size because the
+ * function is not synchronized with the producer. The producer acquires the
+ * ring buffer's spinlock, but this function does not.
+ */
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
- unsigned long cons_pos, prod_pos;
+ unsigned long cons_pos, prod_pos, over_pos;
cons_pos = smp_load_acquire(&rb->consumer_pos);
- prod_pos = smp_load_acquire(&rb->producer_pos);
- return prod_pos - cons_pos;
+
+ if (unlikely(rb->overwrite_mode)) {
+ over_pos = smp_load_acquire(&rb->overwrite_pos);
+ prod_pos = smp_load_acquire(&rb->producer_pos);
+ return prod_pos - max(cons_pos, over_pos);
+ } else {
+ prod_pos = smp_load_acquire(&rb->producer_pos);
+ return prod_pos - cons_pos;
+ }
}
static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
@@ -404,11 +427,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
return (void*)((addr & PAGE_MASK) - off);
}
+static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb,
+ unsigned long new_prod_pos,
+ unsigned long cons_pos,
+ unsigned long pend_pos)
+{
+ /*
+ * No space if oldest not yet committed record until the newest
+ * record span more than (ringbuf_size - 1).
+ */
+ if (new_prod_pos - pend_pos > rb->mask)
+ return false;
+
+ /* Ok, we have space in overwrite mode */
+ if (unlikely(rb->overwrite_mode))
+ return true;
+
+ /*
+ * No space if producer position advances more than (ringbuf_size - 1)
+ * ahead of consumer position when not in overwrite mode.
+ */
+ if (new_prod_pos - cons_pos > rb->mask)
+ return false;
+
+ return true;
+}
+
+static u32 bpf_ringbuf_round_up_hdr_len(u32 hdr_len)
+{
+ hdr_len &= ~BPF_RINGBUF_DISCARD_BIT;
+ return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8);
+}
+
static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
{
- unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, flags;
+ unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos, flags;
struct bpf_ringbuf_hdr *hdr;
- u32 len, pg_off, tmp_size, hdr_len;
+ u32 len, pg_off, hdr_len;
if (unlikely(size > RINGBUF_MAX_RECORD_SZ))
return NULL;
@@ -431,24 +486,43 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
hdr_len = READ_ONCE(hdr->len);
if (hdr_len & BPF_RINGBUF_BUSY_BIT)
break;
- tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT;
- tmp_size = round_up(tmp_size + BPF_RINGBUF_HDR_SZ, 8);
- pend_pos += tmp_size;
+ pend_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
}
rb->pending_pos = pend_pos;
- /* check for out of ringbuf space:
- * - by ensuring producer position doesn't advance more than
- * (ringbuf_size - 1) ahead
- * - by ensuring oldest not yet committed record until newest
- * record does not span more than (ringbuf_size - 1)
- */
- if (new_prod_pos - cons_pos > rb->mask ||
- new_prod_pos - pend_pos > rb->mask) {
+ if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) {
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
return NULL;
}
+ /*
+ * In overwrite mode, advance overwrite_pos when the ring buffer is full.
+ * The key points are to stay on record boundaries and consume enough records
+ * to fit the new one.
+ */
+ if (unlikely(rb->overwrite_mode)) {
+ over_pos = rb->overwrite_pos;
+ while (new_prod_pos - over_pos > rb->mask) {
+ hdr = (void *)rb->data + (over_pos & rb->mask);
+ hdr_len = READ_ONCE(hdr->len);
+ /*
+ * The bpf_ringbuf_has_space() check above ensures we won’t
+ * step over a record currently being worked on by another
+ * producer.
+ */
+ over_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
+ }
+ /*
+ * smp_store_release(&rb->producer_pos, new_prod_pos) at
+ * the end of the function ensures that when consumer sees
+ * the updated rb->producer_pos, it always sees the updated
+ * rb->overwrite_pos, so when consumer reads overwrite_pos
+ * after smp_load_acquire(r->producer_pos), the overwrite_pos
+ * will always be valid.
+ */
+ WRITE_ONCE(rb->overwrite_pos, over_pos);
+ }
+
hdr = (void *)rb->data + (prod_pos & rb->mask);
pg_off = bpf_ringbuf_rec_pg_off(rb, hdr);
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
@@ -578,6 +652,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
return smp_load_acquire(&rb->consumer_pos);
case BPF_RB_PROD_POS:
return smp_load_acquire(&rb->producer_pos);
+ case BPF_RB_OVERWRITE_POS:
+ return smp_load_acquire(&rb->overwrite_pos);
default:
return 0;
}