Librdkafka的根底数据结构lovebet官网

  • 作者们在前面的Librdkafka的基本功数据结构 3 — Buffer相关
    1
    介绍了Buffer和其构成segment,只读映射slice的相关定义和操作;
  • 那风姿罗曼蒂克篇我们来介绍librdkafka中rd_buf_t的莫过于行使,
    实际上是透过rd_kafka_buf_s的卷入来落到实处的;
  • 包括:
    1. struct rd_kafka_buf_s
    2. struct rd_kafka_bufq_t
  • Temporary buffer
  • Buffer segment
  • Buffer
  • Buffer slice

一时写入缓冲区
  • 所在文件: src/rdkafka_buf.h
  • 写操作缓冲区, 写入时保险了8位内部存款和储蓄器对齐,
    以便进步内存读取功效和跨平台的安全性;
  • 定义:

typedef struct rd_tmpabuf_s { size_t size; //buf即内部缓冲区容量 size_t of; //当前写入位置 char *buf; //内部缓冲区 int failed; //写入是否发生了错误 int assert_on_fail; //写入发生错误时,是否assert} rd_tmpabuf_t;
  • 初始化:

static RD_UNUSED voidrd_tmpabuf_new (rd_tmpabuf_t *tab, size_t size, int assert_on_fail) { tab->buf = rd_malloc; //分配内部缓冲区 tab->size = size; //buf即内部缓冲区容量 tab->of = 0; tab->failed = 0; tab->assert_on_fail = assert_on_fail;}
  • 听他们说写入大小重临可供写入的内部存储器地点:

static RD_UNUSED void *rd_tmpabuf_alloc0 (const char *func, int line, rd_tmpabuf_t *tab, size_t size) { void *ptr; if (unlikely(tab->failed)) //发错标志已设置, 则直接返回NULL return NULL; if (unlikely(tab->of + size > tab->size)) { //请求的内存size超出缓冲区容量, 作错误处理 if (tab->assert_on_fail) { fprintf(stderr, "%s: %s:%d: requested size %zd + %zd > %zd\n", __FUNCTION__, func, line, tab->of, size, tab->size); assert(!*"rd_tmpabuf_alloc: not enough size in buffer"); } return NULL; } ptr = (tab->buf + tab->of); //获取可供写入的内存位转置: tab->of += RD_ROUNDUP; //更新当前写入位置, 并作8位对齐 return ptr;}
  • buffer写入:

static RD_UNUSED void *rd_tmpabuf_write0 (const char *func, int line, rd_tmpabuf_t *tab, const void *buf, size_t size) { void *ptr = rd_tmpabuf_alloc0(func, line, tab, size); //根据写入大小返回可供写入的内存位置 if  memcpy(ptr, buf, size); //内存copy return ptr;}
struct rd_kafka_buf_s
  • 所在文件: src/rdkafka_buf.h
  • 本条结构关系的操作非常多, 作者会在背后随着代码的深深摸底来作补充和修改
  • 定义:

struct rd_kafka_buf_s { /* rd_kafka_buf_t */
    TAILQ_ENTRY(rd_kafka_buf_s) rkbuf_link; // 这个rd_kafka_buf_s定义为tailq的元素

    int32_t rkbuf_corrid;  // 对应于kafka协议中request header里的CorrelationId

    rd_ts_t rkbuf_ts_retry;    /* Absolute send retry time */ //发送request的重试的绝对时间

    int     rkbuf_flags; /* RD_KAFKA_OP_F */

        rd_buf_t rkbuf_buf;        /**< Send/Recv byte buffer */  // 发送或接收数据的rd_buf_t
        rd_slice_t rkbuf_reader;   /**< Buffer slice reader for rkbuf_buf */ // 上面rd_buf_t的只读映射

    int     rkbuf_connid;      /* broker connection id (used when buffer
                    * was partially sent). */
        size_t  rkbuf_totlen;      /* recv: total expected length,
                                    * send: not used */   // 接收response时, kafka协议的前四个字节表示payload长度, 这个表示payload有多长, 即后面需要再接收的数据长度

    rd_crc32_t rkbuf_crc;      /* Current CRC calculation */ //CRC校验

    struct rd_kafkap_reqhdr rkbuf_reqhdr;   /* Request header.
                                                 * These fields are encoded
                                                 * and written to output buffer
                                                 * on buffer finalization. */
    struct rd_kafkap_reshdr rkbuf_reshdr;   /* Response header.
                                                 * Decoded fields are copied
                                                 * here from the buffer
                                                 * to provide an ease-of-use
                                                 * interface to the header */

    int32_t rkbuf_expected_size;  /* expected size of message */

        // response的入队列
        rd_kafka_replyq_t   rkbuf_replyq;       /* Enqueue response on replyq */
        rd_kafka_replyq_t   rkbuf_orig_replyq;  /* Original replyq to be used
                                                 * for retries from inside
                                                 * the rkbuf_cb() callback
                                                 * since rkbuf_replyq will
                                                 * have been reset. */
        rd_kafka_resp_cb_t *rkbuf_cb;           /* Response callback */
        struct rd_kafka_buf_s *rkbuf_response;  /* Response buffer */

        struct rd_kafka_broker_s *rkbuf_rkb;  // 相关联的broker

    rd_refcnt_t rkbuf_refcnt; // 引用计数
    void   *rkbuf_opaque;

       // 重试次数
    int     rkbuf_retries;            /* Retries so far. */
#define RD_KAFKA_BUF_NO_RETRIES  1000000  /* Do not retry */

        int     rkbuf_features;   /* Required feature(s) that must be
                                   * supported by broker. */

    rd_ts_t rkbuf_ts_enq;
    rd_ts_t rkbuf_ts_sent;    /* Initially: Absolute time of transmission,
                   * after response: RTT. */
    rd_ts_t rkbuf_ts_timeout;

        int64_t rkbuf_offset;     /* Used by OffsetCommit */  // 需要提交的offset

    rd_list_t *rkbuf_rktp_vers;    /* Toppar + Op Version map.
                    * Used by FetchRequest. */

    rd_kafka_msgq_t rkbuf_msgq;

        rd_kafka_resp_err_t rkbuf_err;      /* Buffer parsing error code */

        union {
                struct {
                        rd_list_t *topics;  /* Requested topics (char *) */
                        char *reason;       /* Textual reason */
                        rd_kafka_op_t *rko; /* Originating rko with replyq
                                             * (if any) */
                        int all_topics;     /* Full/All topics requested */

                        int *decr;          /* Decrement this integer by one
                                             * when request is complete:
                                             * typically points to metadata
                                             * cache's full_.._sent.
                                             * Will be performed with
                                             * decr_lock held. */
                        mtx_t *decr_lock;

                } Metadata;
        } rkbuf_u;
};
  • 为kafka request创建rd_kafka_buf_s
    卡夫卡 request的header在这里个函数中被自动叠合上

rd_kafka_buf_t *rd_kafka_buf_new_request (rd_kafka_broker_t *rkb, int16_t ApiKey,
                                          int segcnt, size_t size) {
        rd_kafka_buf_t *rkbuf;

        /* Make room for common protocol request headers */
        // 计算size 更新, 加上request header大小, 包括client id
        size += RD_KAFKAP_REQHDR_SIZE +
                RD_KAFKAP_STR_SIZE(rkb->rkb_rk->rk_client_id);

       // rd_buffer_t中的segment个数加上, 包括一个header
        segcnt += 1; /* headers */

       按指定的segment个数和size来创建rd_kafka_buf
        rkbuf = rd_kafka_buf_new0(segcnt, size, 0);

        rkbuf->rkbuf_rkb = rkb;
        rd_kafka_broker_keep(rkb);

        rkbuf->rkbuf_reqhdr.ApiKey = ApiKey;

        // 写kafka request header
        /* Write request header, will be updated later. */
        /* Length: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);
        /* ApiKey */
        rd_kafka_buf_write_i16(rkbuf, rkbuf->rkbuf_reqhdr.ApiKey);
        /* ApiVersion: updated later */
        rd_kafka_buf_write_i16(rkbuf, 0);
        /* CorrId: updated later */
        rd_kafka_buf_write_i32(rkbuf, 0);

        /* ClientId */
        rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id);

        return rkbuf;
}
  • 写操作 rd_kafka_buf_write:
    调用rd_buf_write来实现

static RD_INLINE size_t rd_kafka_buf_write (rd_kafka_buf_t *rkbuf,
                                        const void *data, size_t len) {
        size_t r;

        r = rd_buf_write(&rkbuf->rkbuf_buf, data, len);

        if (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC)
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, data, len);

        return r;
}
  • 更新buffer中的局部数据rd_kafka_buf_update:
    调用rd_buf_write_update来实现

static RD_INLINE void rd_kafka_buf_update (rd_kafka_buf_t *rkbuf, size_t of,
                                          const void *data, size_t len) {
        rd_kafka_assert(NULL, !(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC));
        rd_buf_write_update(&rkbuf->rkbuf_buf, of, data, len);
}
  • push操作 rd_kafka_buf_push0
    调用rd_buf_push来得以达成, buf的数据不会被copy, 只作轻巧的指针赋值,
    rd_buf_push会先看近期现行反革命写入的segment还一直不没多余空间,
    借使有的话将余下空间单拆出来, 生成二个新的segment,
    然后再产生叁个新的segment来放要求写入的buf,
    append到rd_kafka_buf_t上, 最终再把拆出来的segment再append

void rd_kafka_buf_push0 (rd_kafka_buf_t *rkbuf, const void *buf, size_t len,
                         int allow_crc_calc, void (*free_cb) (void *)) {
        rd_buf_push(&rkbuf->rkbuf_buf, buf, len, free_cb);

        if (allow_crc_calc && (rkbuf->rkbuf_flags & RD_KAFKA_OP_F_CRC))
                rkbuf->rkbuf_crc = rd_crc32_update(rkbuf->rkbuf_crc, buf, len);
}
  • 本着差异系列的写入操作 rd_kafka_buf_write_*
  • buffer的retry操作, request发送战败大概会重试

int rd_kafka_buf_retry (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {

       //先判断是否需要重试
        if (unlikely(!rkb ||
             rkb->rkb_source == RD_KAFKA_INTERNAL ||
             rd_kafka_terminating(rkb->rkb_rk) ||
             rkbuf->rkbuf_retries + 1 >
             rkb->rkb_rk->rk_conf.max_retries))
                return 0;

    /* Try again */
    rkbuf->rkbuf_ts_sent = 0;
    rkbuf->rkbuf_retries++;
    rd_kafka_buf_keep(rkbuf);
        // 加入broker的重试队列里
    rd_kafka_broker_buf_retry(rkb, rkbuf);
    return 1;
}
  • 处理
    RD_KAFKA_OP_RECV_BUF项指标buffer(实际上是rd_kafka_op_t中的)

void rd_kafka_buf_handle_op (rd_kafka_op_t *rko, rd_kafka_resp_err_t err) {
        rd_kafka_buf_t *request, *response;

        request = rko->rko_u.xbuf.rkbuf;
        rko->rko_u.xbuf.rkbuf = NULL;

        /* NULL on op_destroy() */
    if (request->rkbuf_replyq.q) {
        int32_t version = request->rkbuf_replyq.version;
                /* Current queue usage is done, but retain original replyq for
                 * future retries, stealing
                 * the current reference. */
                request->rkbuf_orig_replyq = request->rkbuf_replyq;
                rd_kafka_replyq_clear(&request->rkbuf_replyq);
        /* Callback might need to version check so we retain the
         * version across the clear() call which clears it. */
        request->rkbuf_replyq.version = version;
    }

    if (!request->rkbuf_cb) {
        rd_kafka_buf_destroy(request);
        return;
    }

        /* Let buf_callback() do destroy()s */
        response = request->rkbuf_response; /* May be NULL */
        request->rkbuf_response = NULL;

       // 获取到reqeust和response后触发回调
        rd_kafka_buf_callback(request->rkbuf_rkb->rkb_rk,
                  request->rkbuf_rkb, err,
                              response, request);
}
Buffer segment
  • 所在文书: src/rdbuf.c
  • 一面之识, buffer segmengbuffer的组成部分,
    buffer是大家下风流倜傥节要介绍的rd_buf_t;
  • 定义:

typedef struct rd_segment_s { TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */ tailq元素 char *seg_p; /**< Backing-store memory */ 用于存储数据的内存指针 size_t seg_of; /**< Current relative write-position 在当前的segment中的写入位置 * (length of payload in this segment) */ size_t seg_size; /**< Allocated size of seg_p */ 存储数据的内存大小 size_t seg_absof; /**< Absolute offset of this segment's * beginning in the grand rd_buf_t */ 当前segment 在整个buffer中写入的起始地址 void (*seg_free) ; /**< Optional free function for seg_p */ int seg_flags; /**< Segment flags */#define RD_SEGMENT_F_RDONLY 0x1 /**< Read-only segment */#define RD_SEGMENT_F_FREE 0x2 /**< Free segment on destroy, * e.g, not a fixed segment. */} rd_segment_t;
  • 初始化:

static void rd_segment_init (rd_segment_t *seg, void *mem, size_t size) { memset(seg, 0, sizeof; seg->seg_p = mem; seg->seg_size = size;}
  • 销毁:

static void rd_segment_destroy (rd_segment_t *seg) { /* Free payload */ //如果提供了seg_free, 也destroy seg- if (seg->seg_free && seg->seg_p) seg->seg_free(seg->seg_p); if (seg->seg_flags & RD_SEGMENT_F_FREE) rd_free;}
  • 看清当前segment是还是不是还会有剩余空间, 并重临当前写指针,
    重临值表示最近还可写入的尺寸

static RD_INLINE RD_UNUSED size_trd_segment_write_remains (const rd_segment_t *seg, void **p) { if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY))) return 0; if  *p = (seg->seg_p + seg->seg_of); return seg->seg_size - seg->seg_of;}
struct rd_kafka_bufq_t
  • 所在文书: src/rdkafka_buf.h
  • 将上面包车型客车rd_kafka_buf_s封装成队列
  • 定义:

typedef struct rd_kafka_bufq_s {
    TAILQ_HEAD(, rd_kafka_buf_s) rkbq_bufs;
    rd_atomic32_t  rkbq_cnt;
    rd_atomic32_t  rkbq_msg_cnt;
} rd_kafka_bufq_t;
  • 提供出队,入队, 删除等操作

主角 Buffter
  • 所在文书: src/rdbuf.c
  • 数码Buffer, 内包多个rd_segment_t的list
  • 定义:

typedef struct rd_buf_s { struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */ segment tailq的头指针 size_t rbuf_segment_cnt; /**< Number of segments */ segment个数 rd_segment_t *rbuf_wpos; /**< Current write position seg */ 当前正在写入的segment size_t rbuf_len; /**< Current  length */ 当前写入数据的总大小 size_t rbuf_size; /**< Total allocated size of * all segments. */所有segment的存储空间大小 char *rbuf_extra; /* Extra memory allocated for * use by segment structs, * buffer memory, etc. */ 可以理解成一个预分配的内存池 size_t rbuf_extra_len; /* Current extra memory used */ size_t rbuf_extra_size; /* Total size of extra memory */} rd_buf_t;
  • 作保当前写入的segment能够写入size大小的多寡:

void rd_buf_write_ensure_contig (rd_buf_t *rbuf, size_t size) { rd_segment_t *seg = rbuf->rbuf_wpos; if  { void *p; size_t remains = rd_segment_write_remains(seg, &p); // 如果当前正在写入的segment足够的剩余空间,那么就写入当前这个segment if (remains >= size) return; /* Existing segment has enough space. */ /* Future optimization: * If existing segment has enough remaining space to warrant * a split, do it, before allocating a new one. */ } /* Allocate new segment */ // 分配新的segment作为新的当前写入的segment rbuf->rbuf_wpos = rd_buf_alloc_segment(rbuf, size, size);}
  • 据说钦点的offset来拿到其所在的segment:

rd_segment_t *rd_buf_get_segment_at_offset (const rd_buf_t *rbuf, const rd_segment_t *hint, size_t absof) { // hint是为了少遍历segment list, 给定一个offset可以命中的segment const rd_segment_t *seg = hint; if (unlikely(absof > rbuf->rbuf_len)) return NULL; /* Only use current write position if possible and if it helps */ if (!seg || absof < seg->seg_absof) seg = TAILQ_FIRST(&rbuf->rbuf_segments); // 遍历查找 do { if (absof >= seg->seg_absof && absof < seg->seg_absof + seg->seg_of) { rd_dassert(seg->seg_absof <= rd_buf_len; return (rd_segment_t *)seg; } } while ((seg = TAILQ_NEXT(seg, seg_link))); return NULL;}
  • 将segment按钦点的offset分离成三个独立的segment,
    那是一个里面函数,按源码里的注脚近来只用于最后二个segment的空余存储空间抽离

static rd_segment_t *rd_segment_split (rd_buf_t *rbuf, rd_segment_t *seg, size_t absof) { rd_segment_t *newseg; size_t relof; rd_assert(seg == rbuf->rbuf_wpos); rd_assert(absof >= seg->seg_absof && absof <= seg->seg_absof + seg->seg_of); // 确定在segment中的分离位置 relof = absof - seg->seg_absof; // 创建新的segment newseg = rd_buf_alloc_segment0; /* Add later part of split bytes to new segment */ newseg->seg_p = seg->seg_p+relof; newseg->seg_of = seg->seg_of-relof; newseg->seg_size = seg->seg_size-relof; newseg->seg_absof = SIZE_MAX; /* Invalid */ newseg->seg_flags |= seg->seg_flags; /* Remove earlier part of split bytes from previous segment */ // 原有segment更新 seg->seg_of = relof; seg->seg_size = relof; /* newseg's length will be added to rbuf_len in append_segment(), * so shave it off here from seg's perspective. */ // 分离后不再属于当前buffer, buffer相关值更新 rbuf->rbuf_len -= newseg->seg_of; rbuf->rbuf_size -= newseg->seg_size; return newseg;}
  • buffer 初始化:

void rd_buf_init (rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size) { size_t totalloc = 0; // 各成员清0 memset(rbuf, 0, sizeof; // segment list初始化 TAILQ_INIT(&rbuf->rbuf_segments); if (!fixed_seg_cnt) { assert(!buf_size); return; } /* Pre-allocate memory for a fixed set of segments that are known * before-hand, to minimize the number of extra allocations * needed for well-known layouts (such as headers, etc) */ totalloc += RD_ROUNDUP(sizeof(rd_segment_t), 8) * fixed_seg_cnt; /* Pre-allocate extra space for the backing buffer. */ totalloc += buf_size; rbuf->rbuf_extra_size = totalloc; // 预分配内存 rbuf->rbuf_extra = rd_malloc(rbuf->rbuf_extra_size);}
  • 获得当前可用于写入数据的内部存款和储蓄器指针,
    再次回到值是赢得的写入指针所在segment的结余空间

static size_trd_buf_get_writable0 (rd_buf_t *rbuf, rd_segment_t **segp, void **p) { rd_segment_t *seg; for (seg = rbuf->rbuf_wpos ; seg ; seg = TAILQ_NEXT(seg, seg_link)) { size_t len = rd_segment_write_remains; /* Even though the write offset hasn't changed we * avoid future segment scans by adjusting the * wpos here to the first writable segment. */ // 更新buffer的rbuf_wpos rbuf->rbuf_wpos = seg; if  *segp = seg; if (unlikely) continue; /* Also adjust absof if the segment was allocated * before the previous segment's memory was exhausted * and thus now might have a lower absolute offset * than the previos segment's now higher relative offset. */ // 更新获取的segment的seg_absof if (seg->seg_of == 0 && seg->seg_absof < rbuf->rbuf_len) seg->seg_absof = rbuf->rbuf_len; return len; } return 0;}
  • 写payload到buffer:

size_t rd_buf_write (rd_buf_t *rbuf, const void *payload, size_t size) { size_t remains = size; size_t initial_absof; const char *psrc = (const char *)payload; initial_absof = rbuf->rbuf_len; /* Ensure enough space by pre-allocating segments. */ rd_buf_write_ensure(rbuf, size, 0); // 循环写,直到payload全部写入 while (remains > 0) { void *p; rd_segment_t *seg; // 获取当前可以用于写入的segment, 这个segment对应的内存指针和剩余空间 size_t segremains = rd_buf_get_writable0(rbuf, &seg, &p); size_t wlen = RD_MIN(remains, segremains); rd_dassert(seg == rbuf->rbuf_wpos); rd_dassert(wlen > 0); rd_dassert(seg->seg_p+seg->seg_of <= p && p < seg->seg_p+seg->seg_size); if  { // payload内存copy memcpy(p, psrc, wlen); psrc += wlen; } // 更新各offset seg->seg_of += wlen; rbuf->rbuf_len += wlen; remains -= wlen; } rd_assert(remains == 0); return initial_absof;}
  • 从钦点offset起初部分更新buffer

size_t rd_buf_write_update (rd_buf_t *rbuf, size_t absof, const void *payload, size_t size) { rd_segment_t *seg; const char *psrc = (const char *)payload; size_t of; /* Find segment for offset */ // 根据指定的offsewt找到需要更新的segment seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); rd_assert(seg && *"invalid absolute offset"); // 可能需要更新多个连续的segment for (of = 0 ; of < size ; seg = TAILQ_NEXT(seg, seg_link)) { rd_assert(seg->seg_absof <= rd_buf_len; // 更新 size_t wlen = rd_segment_write_update(seg, absof+of, psrc+of, size-of); of += wlen; } rd_dassert(of == size); return of;}
  • buffer的push操作

void rd_buf_push (rd_buf_t *rbuf, const void *payload, size_t size, void  { rd_segment_t *prevseg, *seg, *tailseg = NULL; // push操作前需要将当前正在写入的segment的剩余空间分离为一个独立的segment if ((prevseg = rbuf->rbuf_wpos) && rd_segment_write_remains(prevseg, NULL) > 0) { /* If the current segment still has room in it split it * and insert the pushed segment in the middle . */ tailseg = rd_segment_split(rbuf, prevseg, prevseg->seg_absof + prevseg->seg_of); } // 创建一个新的segment, 用payload初始化,然后apeend到segment list seg = rd_buf_alloc_segment0; seg->seg_p = payload; seg->seg_size = size; seg->seg_of = size; seg->seg_free = free_cb; seg->seg_flags |= RD_SEGMENT_F_RDONLY; rd_buf_append_segment(rbuf, seg); // 将上面分离出来的独立的segment添加到segment list的末尾 if  rd_buf_append_segment(rbuf, tailseg);}
  • buffer的seek操作, 其实正是一个truncat操作

int rd_buf_write_seek (rd_buf_t *rbuf, size_t absof) { rd_segment_t *seg, *next; size_t relof; // 定位到需要操作的起始segment seg = rd_buf_get_segment_at_offset(rbuf, rbuf->rbuf_wpos, absof); if (unlikely return -1; relof = absof - seg->seg_absof; if (unlikely(relof > seg->seg_of)) return -1; /* Destroy sub-sequent segments in reverse order so that * destroy_segment() length checks are correct. * Will decrement rbuf_len et.al. */ // 截断操作,destroy掉不再需要的segment for (next = TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head) ; next != seg ; next = TAILQ_PREV(next, rd_segment_head, seg_link)) rd_buf_destroy_segment(rbuf, next); /* Update relative write offset */ seg->seg_of = relof; rbuf->rbuf_wpos = seg; rbuf->rbuf_len = seg->seg_absof + seg->seg_of; rd_assert(rbuf->rbuf_len == absof); return 0;}

Librdkafka源码解析-Content Table

Buffer slice
  • 所在文书: src/rdkafka_buf.h
  • 从名称想到所包蕴的意义, 表示上边介绍的 rd_buffer_t的一个片断, 只读,
    不会改进buffer的内容;
  • 定义:

typedef struct rd_slice_s { const rd_buf_t *buf; /**< Pointer to buffer */ 映射的rd_buffer_t const rd_segment_t *seg; /**< Current read position segment. * Will point to NULL when end of * slice is reached. */ 当前正在读取的rd_buffer_t中的segment size_t rof; /**< Relative read offset in segment */ 当前正在读取的rd_buffer_t中的segment的读offset size_t start; /**< Slice start offset in buffer */ size_t end; /**< Slice end offset in buffer+1 */} rd_slice_t;
  • 用segment来起头化slice

int rd_slice_init_seg (rd_slice_t *slice, const rd_buf_t *rbuf, const rd_segment_t *seg, size_t rof, size_t size) { /* Verify that \p size bytes are indeed available in the buffer. */ // 如果需要读取的数据大小已超出buffer写入的数据量, 则返回-1 if (unlikely(rbuf->rbuf_len < (seg->seg_absof + rof + size))) return -1; // 初始化过程 slice->buf = rbuf; slice->seg = seg; slice->rof = rof; // 这个地方rof应该判断一下是否小于segment的seg_of slice->start = seg->seg_absof + rof; slice->end = slice->start + size; rd_assert(seg->seg_absof+rof >= slice->start && seg->seg_absof+rof <= slice->end); rd_assert(slice->end <= rd_buf_len; return 0;}
  • 按钦点的offset来开端化slice

int rd_slice_init (rd_slice_t *slice, const rd_buf_t *rbuf, size_t absof, size_t size) { //根据offset来定位到对应buffer中的segment const rd_segment_t *seg = rd_buf_get_segment_at_offset(rbuf, NULL, absof); if (unlikely return -1; return rd_slice_init_seg(slice, rbuf, seg, absof - seg->seg_absof, size);}
  • 将总体buffer都映射到slice

void rd_slice_init_full (rd_slice_t *slice, const rd_buf_t *rbuf) { int r = rd_slice_init(slice, rbuf, 0, rd_buf_len; rd_assert;}
  • slice的读操作, 这一个函数也便是八个slice的iterator遍历接口,
    遍历那几个slice对应的每三个segment

size_t rd_slice_reader0 (rd_slice_t *slice, const void **p, int update_pos) { size_t rof = slice->rof; size_t rlen; const rd_segment_t *seg; /* Find segment with non-zero payload */ // 获取当前要读取的segment for (seg = slice->seg ; seg && seg->seg_absof+rof < slice->end && seg->seg_of == rof ; seg = TAILQ_NEXT(seg, seg_link)) rof = 0; if (unlikely(!seg || seg->seg_absof+rof >= slice->end)) return 0; rd_assert(seg->seg_absof+rof <= slice->end); *p = (const void *)(seg->seg_p + rof); rlen = RD_MIN(seg->seg_of - rof, rd_slice_remains; // 更新 slice后当前操作的segment以及rof if (update_pos) { if (slice->seg != seg) { rd_assert(seg->seg_absof + rof >= slice->start && seg->seg_absof + rof+rlen <= slice->end); slice->seg = seg; slice->rof = rlen; } else { slice->rof += rlen; } } return rlen;}
  • 读slice到内存buffer

size_t rd_slice_read (rd_slice_t *slice, void *dst, size_t size) { size_t remains = size; char *d = dst; /* Possibly NULL */ size_t rlen; const void *p; size_t orig_end = slice->end; if (unlikely(rd_slice_remains < size)) return 0; /* Temporarily shrink slice to offset + \p size */ slice->end = rd_slice_abs_offset + size; // 使用 rd_slice_reader来作iterator式的遍历 while ((rlen = rd_slice_reader(slice, &p))) { rd_dassert(remains >= rlen); if  { // 内存copy memcpy(d, p, rlen); d += rlen; } remains -= rlen; } rd_dassert(remains == 0); /* Restore original size */ slice->end = orig_end; return size;}
  • slice的seek操作

int rd_slice_seek (rd_slice_t *slice, size_t offset) { const rd_segment_t *seg; size_t absof = slice->start + offset; if (unlikely(absof >= slice->end)) return -1; // 根据offset来定位segment seg = rd_buf_get_segment_at_offset(slice->buf, slice->seg, absof); rd_assert; // 更新slice slice->seg = seg; slice->rof = absof - seg->seg_absof; rd_assert(seg->seg_absof + slice->rof >= slice->start && seg->seg_absof + slice->rof <= slice->end); return 0;}

Librdkafka源码深入分析-Content Table

相关文章