数据序列化与反序列化开发ByteArray
to be continued...
1. 准备:Protobuf
概念:Protobuf(Google Protocol Buffers)是谷歌提供的一个具有搞笑的协议数据交换格式的工具库(类似Json数据格式),时间和空间效率上都比Json要好一些。目前还不是太流行,仅支持C++、JAVA、python语言的开发。
重点关注:ProtoBuf中数据序列与反序列的规则
protobuf把消息结果message也是通过 key-value键值对来表示。只是其中的key是采取一定的算法计算出来的即通过每个message中每个字段(field index)和字段的数据类型(type)进行运算得来:
key = (index << 3) | type
field index = key >> 3
type = key & 0x111
type类型的对应关系如下:
type | mean | used for |
---|---|---|
0 | varint | int32, int64, uint32, uint64, sint32, sint64, bool, enum |
1 | 64-bit | fixed64, sfixed64, double |
2 | length-delimited | string, bytes, embedded messages, packed repeated fields |
3 | start group | groups (deprecated) |
4 | end group | groups (deprecated) |
5 | 32-bit | fixed32, sfixed32, float |
2. 准备:varint编码
概念:varint是一种使用1字节至多字节序列化一个整数的方***根据一个整数的数值大小对应编码变长的存储字节大小。32int型数据经varint编码后需要1-5字节;64int型数据经varint编码后需要1-10字节。通常,在实际应用场景中,小数字的使用远多于大数字,varint编码能够起到很好的压缩效果,节省空间。
编码原理
除最后一个字节外,每一个字节的最高位都有特殊的意义,即:最高有效位(most significant bit --msb),msb = 1表明后面的数据还是属于当前数据;msb = 0表明这已经是当前数据已经结束。
每个字节的低7位用于以7位为一组存储数字的二进制补码表示,且按照小端字节序排列。
例子1,将数字uint32_t = 123456进行varint编码
123456 | 0000 0000 000|0 0001 11|10 0010 0|100 0000 ------二进制补码表示 4字节表示 | | 每7bit划分,添加msb重新编码,从低位到高位取,并反转排序 | 1|1000000 1|1000100 0|0000111 0xC0 0xC4 0x07 --------varint编码后的数据 3字节表示
例子2,将数字uint32_t = 665进行varint编码
665 | 0000 0000 0000 0000 00|00 0010 1|001 1001 ------二进制补码表示 4字节表示 | | 每7bit划分,添加msb重新编码,从低位到高位取,并反转排序 | 1|0011001 0|0000101 0X99 0X05 --------varint编码后的数据 2字节表示
例子3,将数字int32_t = -1进行varint编码(可以看到有时候对负数的压缩编码效率低下)
-1 | 1111 |1111 111|1 1111 11|11 1111 1|111 1111 ------二进制补码表示 4字节表示 | | 每7bit划分,添加msb重新编码,从低位到高位取,并反转排序 | 1|1111111 1|1111111 1|1111111 1|1111111 0|0001111 0xff 0xff 0xff 0xff 0x0f ------varint编码后的数据 5字节表示
例子4,将数字int32_t = -123进行varint编码(可以看到有时候对负数的压缩编码效率低下)
-123 | 1111 |1111 111|1 1111 11|11 1111 1|000 0101 ------二进制补码表示 4字节表示 | | 每7bit划分,添加msb重新编码,从低位到高位取,并反转排序 | 1|0000101 1|1111111 1|1111111 1|1111111 0|0001111 0x85 0xff 0xff 0xff 0x0f ------varint编码后的数据 5字节表示
3. ByteArray类代码实现
ByteArray是一个字节数组容器,提供基础类型的序列化与反序列化功能。ByteArray支持基础类型的序列化与反序列化功能,并且支持将序列化的结果写入文件,以及从文件中读取内容进行反序列化。ByteArray支持以下类型的序列化与反序列化:
- 固定长度的有符号/无符号8位、16位、32位、64位整数
- 不固定长度的有符号/无符号32位、64位整数
- float、double类型
- 字符串,包含字符串长度,长度范围支持16位、32位、64位。
- 字符串,不包含长度。
原因:网络编程中势必涉及到数据包的封装、传递等操作。将这些数据的序列化和反序列化操作抽象到一个类中来管理,有点类似"内存池"的管理,但不是真正的内存池。网络编程,还要注意数据包传递、组装还原时候字节序问题。
ByteArray的底层存储是固定大小的块称为Node,以链表形式组织。每次写入数据时,将数据写入到链表最后一个块中,如果最后一个块不足以容纳数据,则分配一个新的块并添加到链表结尾,再写入数据。ByteArray会记录当前的操作位置,每次写入数据时,该操作位置按写入大小往后偏移,如果要读取数据,则必须调用setPosition重新设置当前的操作位置。
3.1 字节序
当用户物理机字节序和网络字节序不同,进行一个转换。不同的物理机的自带字节序不一定和网络字节序相同。网络字节序为大端字节序,物理机不同厂商设置的字节序可能是大端也可能是小端。
3.1.1 使用SFINEA规则对不同位数据的位交换重载
SFINEA,替换失败并不是一个错误,即subsitution failure is not an error
通过enable_if实现,它的定义如下所示
template <bool, typename T=void> struct enable_if { }; template <typename T> struct enable_if<true, T> { using type = T; };
16bit、32bit、64bit的数据重载它们的高低位交换函数。使用enable_if与,使用if...else写法和这种模板函数重载效果是一样的
//8字节类型的字节序转化 template<class T> typename std::enable_if<sizeof(T) == sizeof(uint64_t), T>::type byteswap(T value){ return (T)bswap_64((uint64_t)value); } //4字节类型的字节序转化 template<class T> typename std::enable_if<sizeof(T) == sizeof(uint32_t), T>::type byteswap(T value){ return (T)bswap_32((uint32_t)value); } //2字节类型的字节序转化 template<class T> typename std::enable_if<sizeof(T) == sizeof(uint16_t), T>::type byteswap(T value){ return (T)bswap_16((uint16_t)value); }
网络服务器就要涉及网络字节序与主机字节序的转换,网络字节序一般是大端字节序,主机字节序一般是小端。
小端机器:收到大端字节序-------->小端字节序;发出:小端字节序------>大端字节序
大端机器:收到:一般不需要做改变;发出:一般不需要做改变
//根据用户物理机大小端存储,决定如何转换字节序 #if BYTE_ORDER == BIG_ENDIAN #define SYLAR_BYTE_ORDER SYLAR_BIG_ENDIAN #else #define SYLAR_BYTE_ORDER SYLAR_LITTLE_ENDIAN #endif #if SYLAR_BYTE_ORDER == SYLAR_BIG_ENDIAN //判断物理机大小端 //得到大端 大端机器 什么都不用操作 //只在小端机器上执行byteswap, 在大端机器上什么都不做 template<class T> T byteswapOnLittleEndian(T t){ return t; } //得到小端 大端机器 大端----->小端 //只在大端机器上执行byteswap, 在小端机器上什么都不做 template<class T> T byteswapOnBigEndian(T t){ return byteswap(t); } #else //得到大端 小端机器 小端------->大端 //只在小端机器上执行byteswap, 在大端机器上什么都不做 template<class T> T byteswapOnLittleEndian(T t){ return byteswap(t); } //得到小端 小端机器 什么都不用做 //只在大端机器上执行byteswap, 在小端机器上什么都不做 template<class T> T byteswapOnBigEndian(T t){ return t; } #endif
3.2 成员变量
class ByteArray{ public: struct Node { Node(size_t s); //构造指定大小的内存块 s: 内存块字节数 Node(); ~Node(); char* ptr; //内存块地址指针 Node* next; //下一个内存块地址 size_t size; //内存块大小 }; ··· ··· private: size_t m_baseSize; //内存块的大小 size_t m_position; //当前操作位置,N * m_baseSize + nowposition size_t m_capacity; //当前的总容量 size_t m_size; //当前数据的大小 int8_t m_endian; //字节序,默认大端 Node* m_root; //第一个内存块指针 Node* m_cur; //当前操作的内存块指针 }
3.3 将有符号数转换为无符号数
varint算法对负数压缩的效率很低下,符号位是1。将有符号的数都转为无符号数进行varint压缩。整数的转换值为原来的两倍,负数的转换值为原来的两倍再+1,转换后+x和-x是大小相邻相差1的两个数。
存入时候负---->正,取出时候恢复正----->负
//int32_t---------->uint32_t static uint32_t EncodeZigzag32(const int32_t& v){ //不转换的话 负数压缩会浪费空间 if(v < 0){ return ((uint32_t)(-v)) * 2 - 1; }else{ return v * 2; } } //int64_t---------->uint64_t static uint64_t EncodeZigzag64(const int64_t& v){ //不转换的话 -1压缩一定要消耗10个字节 if(v < 0){ return ((uint64_t)(-v)) * 2 - 1; }else{ return v * 2; } } //uint32_t---------->int32_t static int32_t DecodeZigzag32(const uint32_t& v){ //消除乘2 异或一下最后一位来恢复负数 return (v >> 1) ^ -(v & 1); //v>>1 除2 | v&1 通过判断v是偶数还是奇数来判断zigzag之前的正负 } //uint64_t---------->int64_t static int64_t DecodeZigzag64(const uint64_t& v){ //消除乘2 异或一下最后一位来恢复负数 return (v >> 1) ^ -(v & 1); }
3.4 读写函数
封装好读写函数,后面读取固定长度数据或者varint数据的时候可以复用。
写write()将缓冲区的内容写入到内存块中。
读read()将内存块的内容读取到缓冲区中。和write()一模一样的流程,读什么位置依赖于内存指针的位置,始终从内存指针m_position一直读到最后。
//向内存缓存buf中写入size长度的数据。m_position += size, 如果m_position > m_size 则 m_size = m_position void write(const void* buf, size_t size); //在内存缓存buf中读取size长度的数据 //m_position += size, 如果m_position > m_size 则 m_size = m_position。如果getReadSize() < size 则抛出 std::out_of_range void read(void* buf, size_t size); //在内存缓存buf中读取size长度的数据,从position开始读取。如果 (m_size - position) < size 则抛出 std::out_of_range void read(void* buf, size_t size, size_t position) const; void ByteArray::write(const void* buf, size_t size){ if(size == 0) return; addCapacity(size); //保险起见,先addCapacity size个字节 size_t npos = m_position % m_baseSize; //内存指针现在在内存块结点哪一个字节位置上 size_t ncap = m_cur->size - npos; //当前结点的剩余容量 size_t bpos = 0; //已经写入内存的数据量 while(size > 0){ if(ncap >= size){ //内存结点当前剩余容量能放下size的数据 memcpy(m_cur->ptr + npos, (const char*)buf + bpos, size); if(m_cur->size == (npos + size)) //正好把这一块填满 m_cur = m_cur->next; m_position += size; bpos += size; size = 0; }else{ //不够放 先把当前剩余空间写完 在下一个新结点继续写入 memcpy(m_cur->ptr + npos, (const char*)buf + bpos, ncap); //复制ncap个bytes从buf+bpos到m_cur->ptr+npos m_position += ncap; bpos += ncap; size -= ncap; //去遍历下一个内存块 m_cur = m_cur->next; ncap = m_cur->size; npos = 0; } } //如果内存指针超过了当前表示的已经使用的空间大小 更新一下 if(m_position > m_size) m_size = m_position; } //将内存块的内容读取到缓冲区中。和write()一模一样的流程,读什么位置依赖于内存指针的位置,始终从内存指针m_position一直读到最后 void ByteArray::read(void* buf, size_t size){ if(size > getReadSize()) //读取的长度超出可读范围要抛异常 throw std::out_of_range("not enough len"); size_t npos = m_position % m_baseSize; //内存指针现在在内存块结点哪一个字节位置上 size_t ncap = m_cur->size - npos; //当前结点剩余容量 size_t bpos = 0; //当前已经读取的数据量 while(size > 0){ if(ncap >= size){ memcpy((char*)buf + bpos, m_cur->ptr + npos, size); if(m_cur->size == (npos + size)) //如果当前结点被读完 m_cur = m_cur->next; m_position += size; bpos += size; size = 0; }else{ memcpy((char*)buf + bpos, m_cur->ptr + npos, ncap); m_position += ncap; bpos += ncap; size -= ncap; m_cur = m_cur->next; ncap = m_cur->size; npos = 0; } } } //将内存块的内容读取到缓冲区中,但不影响当前内存指针指向的位置,使用一个外部传入的内存指针position,而不使用当前真正的内存指针m_position。 //即:用户只关心存储的内容,而不关心是否移除内存中的内容,或许还要紧接着写入内容 void ByteArray::read(void* buf, size_t size, size_t position) const{ if(size > (m_size - position)) throw std::out_of_range("not enough len"); size_t npos = position % m_baseSize; size_t ncap = m_cur->size - npos; size_t bpos = 0; Node* cur = m_cur; while(size > 0){ if(ncap >= size){ memcpy((char*)buf + bpos, cur->ptr + npos, size); if(cur->size == (npos + size)) cur = cur->next; position += size; bpos += size; size = 0; }else{ memcpy((char*)buf + bpos, cur->ptr + npos, ncap); position += ncap; bpos += ncap; size -= ncap; cur = cur->next; ncap = cur->size; npos = 0; } } }
3.5 varint编码
读取固定长度的数据直接根据大小端,复用read和write即可,不详细叙述。
读
函数声明
调用前:
getReadSize() >= Varint数(16/32/64/float/double)实际占用内存,否则抛出 std::out_of_range
etReadSize() >= VarintString(16/32/64/float/double)+size 实际占用内存,否则抛出 std::out_of_range
调用后:
m_position += Varint数实际占用内存
m_position += VarintString实际占用内存
int32_t readInt32(); //读取有符号Varint32类型的数据 uint32_t readUint32(); //读取无符号Varint32类型的数据 int64_t readInt64(); //读取有符号Varint64类型的数据 uint64_t readUint64(); //读取无符号Varint64类型的数据 float readFloat(); //读取float类型的数据 double readDouble(); //读取double类型的数据 std::string readStringF16(); //读取std::string类型的数据,用uint16_t作为长度 std::string readStringF32(); //读取std::string类型的数据,用uint32_t作为长度 std::string readStringF64(); //读取std::string类型的数据,用uint64_t作为长度 std::string readStringVint(); //读取std::string类型的数据,用无符号Varint64作为长度
函数实现
int32_t ByteArray::readInt32(){ return DecodeZigzag32(readUint32()); } uint32_t ByteArray::readUint32(){ //最终得到一个uint32_t型数据 uint32_t result = 0; //max读取次数 = 32 / 7 + 1 组 for(int i = 0; i < 32; i += 7){ uint8_t b = readFuint8(); //一次读取8位 if(b < 0x80){ //msp==0 说明这是该数据的最后一个字节 result |= ((uint32_t)b) << i; break; }else{ //msp==1 说明后面还有字节没有取 要去掉头部的msp位 result |= (((uint32_t)(b & 0x7f)) << i); } } return result; } int64_t ByteArray::readInt64(){ return DecodeZigzag64(readUint64()); } uint64_t ByteArray::readUint64(){ //最终得到一个uint32_t型数据 uint64_t result = 0; //max读取次数 = 64 / 7 + 1 组 for(int i = 0; i < 64; i += 7){ uint8_t b = readFuint8(); if(b < 0x80){ //msp==0 说明这是该数据的最后一个字节 result |= ((uint64_t)b) << i; //最后一个字节直接或运算 不用去msp位 break; }else{ //msp==1 说明后面还有字节没有取 要去掉头部的msp位 result |= (((uint64_t)(b & 0x7f)) << i); } } return result; } float ByteArray::readFloat(){ uint32_t v = readFuint32(); float value; memcpy(&value, &v, sizeof(v)); return value; } double ByteArray::readDouble(){ uint64_t v = readFuint64(); double value; memcpy(&value, &v, sizeof(v)); return value; } std::string ByteArray::readStringF16(){ uint16_t len = readFuint16(); std::string buff; buff.resize(len); read(&buff[0], len); return buff; } std::string ByteArray::readStringF32(){ uint32_t len = readFuint32(); std::string buff; buff.resize(len); read(&buff[0], len); return buff; } std::string ByteArray::readStringF64(){ uint64_t len = readFuint64(); std::string buff; buff.resize(len); read(&buff[0], len); return buff; } std::string ByteArray::readStringVint(){ uint64_t len = readUint64(); std::string buff; buff.resize(len); read(&buff[0], len); return buff; }
写
函数声明
void writeInt32 (int32_t value); //写入有符号Varint32类型的数据,m_position += 实际占用内存(1 ~ 5) void writeUint32 (uint32_t value); //写入无符号Varint32类型的数据,m_position += 实际占用内存(1 ~ 5) void writeInt64 (int64_t value); //写入有符号Varint64类型的数据,m_position += 实际占用内存(1 ~ 10) void writeUint64 (uint64_t value); //写入无符号Varint64类型的数据,m_position += 实际占用内存(1 ~ 10) void writeFloat (float value); //写入float类型的数据,m_position += sizeof(value) void writeDouble (double value); //写入double类型的数据,m_position += sizeof(value) void writeStringF16(const std::string& value); //写入std::string类型的数据,用uint16_t作为长度类型,m_position += 2 + value.size() void writeStringF32(const std::string& value); //写入std::string类型的数据,用uint32_t作为长度类型,m_position += 4 + value.size() void writeStringF64(const std::string& value); //写入std::string类型的数据,用uint64_t作为长度类型,m_position += 8 + value.size() void writeStringVint(const std::string& value); //写入std::string类型的数据,用无符号Varint64作为长度类型,m_position += Varint64长度 + value.size() void writeStringWithoutLength(const std::string& value); //写入std::string类型的数据,无长度,m_position += value.size()
函数实现
void ByteArray::writeInt32 (int32_t value){ writeUint32(EncodeZigzag32(value)); } void ByteArray::writeUint32 (uint32_t value){ //uint32_t 压缩后1~5字节的大小 uint8_t tmp[5]; uint8_t i = 0; //varint编码 msp等于1 就认为数据还没读完 while(value >= 0x80){ //0x80 1000 0000 //取低7位 + msp==1 组成新的编码数据 tmp[i++] = (value & 0x7F) | 0x80; value >>= 7; } tmp[i++] = value; write(tmp, i); } void ByteArray::writeInt64 (int64_t value){ writeUint64(EncodeZigzag64(value)); } void ByteArray::writeUint64 (uint64_t value){ //uint64_t 压缩后1~10字节的大小 uint8_t tmp[10]; uint8_t i = 0; //varint编码 msp等于1 就认为数据还没读完 while(value >= 0x80){ //取低7位+msp==1 组成新的编码数据 tmp[i++] = (value & 0x7F) | 0x80; value >>= 7; } tmp[i++] = value; write(tmp, i); } void ByteArray::writeFloat (float value){ uint32_t v; memcpy(&v, &value, sizeof(value)); writeFuint32(v); } void ByteArray::writeDouble (double value){ uint64_t v; memcpy(&v, &value, sizeof(value)); writeFuint64(v); } void ByteArray::writeStringF16(const std::string& value){ writeFuint16(value.size()); write(value.c_str(), value.size()); } void ByteArray::writeStringF32(const std::string& value){ writeFuint32(value.size()); write(value.c_str(), value.size()); } void ByteArray::writeStringF64(const std::string& value){ writeFuint64(value.size()); write(value.c_str(), value.size()); } void ByteArray::writeStringVint(const std::string& value){ writeUint64(value.size()); write(value.c_str(), value.size()); } void ByteArray::writeStringWithoutLength(const std::string& value){ write(value.c_str(), value.size()); }#笔记#