123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "csvarchive.hh"
- #include <stdlib.h>
- using namespace hadoop;
- static std::string readUptoTerminator(PushBackInStream& stream)
- {
- std::string s;
- while (1) {
- char c;
- if (1 != stream.read(&c, 1)) {
- throw new IOException("Error in deserialization.");
- }
- if (c == ',' || c == '\n' || c == '}') {
- if (c != ',') {
- stream.pushBack(c);
- }
- break;
- }
- s.push_back(c);
- }
- return s;
- }
- void hadoop::ICsvArchive::deserialize(int8_t& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = (int8_t) strtol(s.c_str(), NULL, 10);
- }
- void hadoop::ICsvArchive::deserialize(bool& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = (s == "T") ? true : false;
- }
- void hadoop::ICsvArchive::deserialize(int32_t& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = strtol(s.c_str(), NULL, 10);
- }
- void hadoop::ICsvArchive::deserialize(int64_t& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = strtoll(s.c_str(), NULL, 10);
- }
- void hadoop::ICsvArchive::deserialize(float& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = strtof(s.c_str(), NULL);
- }
- void hadoop::ICsvArchive::deserialize(double& t, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- t = strtod(s.c_str(), NULL);
- }
- static void replaceAll(std::string s, const char *src, char c)
- {
- std::string::size_type pos = 0;
- while (pos != std::string::npos) {
- pos = s.find(src);
- if (pos != std::string::npos) {
- s.replace(pos, strlen(src), 1, c);
- }
- }
- }
- void hadoop::ICsvArchive::deserialize(std::string& t, const char* tag)
- {
- t = readUptoTerminator(stream);
- if (t[0] != '\'') {
- throw new IOException("Errror deserializing string.");
- }
- t.erase(0, 1); /// erase first character
- replaceAll(t, "%0D", 0x0D);
- replaceAll(t, "%0A", 0x0A);
- replaceAll(t, "%7D", 0x7D);
- replaceAll(t, "%00", 0x00);
- replaceAll(t, "%2C", 0x2C);
- replaceAll(t, "%25", 0x25);
- }
- void hadoop::ICsvArchive::deserialize(std::string& t, size_t& len, const char* tag)
- {
- std::string s = readUptoTerminator(stream);
- if (s[0] != '#') {
- throw new IOException("Errror deserializing buffer.");
- }
- s.erase(0, 1); /// erase first character
- len = s.length();
- if (len%2 == 1) { // len is guaranteed to be even
- throw new IOException("Errror deserializing buffer.");
- }
- len >> 1;
- for (size_t idx = 0; idx < len; idx++) {
- char buf[3];
- buf[0] = s[2*idx];
- buf[1] = s[2*idx+1];
- buf[2] = '\0';
- int i;
- if (1 != sscanf(buf, "%2x", &i)) {
- throw new IOException("Errror deserializing buffer.");
- }
- t.push_back((char) i);
- }
- len = t.length();
- }
- void hadoop::ICsvArchive::startRecord(Record& s, const char* tag)
- {
- if (tag != NULL) {
- char mark[2];
- if (2 != stream.read(mark, 2)) {
- throw new IOException("Error deserializing record.");
- }
- if (mark[0] != 's' || mark[1] != '{') {
- throw new IOException("Error deserializing record.");
- }
- }
- }
- void hadoop::ICsvArchive::endRecord(Record& s, const char* tag)
- {
- char mark;
- if (1 != stream.read(&mark, 1)) {
- throw new IOException("Error deserializing record.");
- }
- if (tag == NULL) {
- if (mark != '\n') {
- throw new IOException("Error deserializing record.");
- }
- } else if (mark != '}') {
- throw new IOException("Error deserializing record.");
- } else {
- readUptoTerminator(stream);
- }
- }
- Index* hadoop::ICsvArchive::startVector(const char* tag)
- {
- char mark[2];
- if (2 != stream.read(mark, 2)) {
- throw new IOException("Error deserializing vector.");
- }
- if (mark[0] != 'v' || mark[1] != '{') {
- throw new IOException("Error deserializing vector.");
- }
- return new CsvIndex(stream);
- }
- void hadoop::ICsvArchive::endVector(Index* idx, const char* tag)
- {
- delete idx;
- char mark;
- if (1 != stream.read(&mark, 1)) {
- throw new IOException("Error deserializing vector.");
- }
- if (mark != '}') {
- throw new IOException("Error deserializing vector.");
- }
- readUptoTerminator(stream);
- }
- Index* hadoop::ICsvArchive::startMap(const char* tag)
- {
- char mark[2];
- if (2 != stream.read(mark, 2)) {
- throw new IOException("Error deserializing map.");
- }
- if (mark[0] != 'm' || mark[1] != '{') {
- throw new IOException("Error deserializing map.");
- }
- return new CsvIndex(stream);
- }
- void hadoop::ICsvArchive::endMap(Index* idx, const char* tag)
- {
- delete idx;
- char mark;
- if (1 != stream.read(&mark, 1)) {
- throw new IOException("Error deserializing map.");
- }
- if (mark != '}') {
- throw new IOException("Error deserializing map.");
- }
- readUptoTerminator(stream);
- }
- hadoop::ICsvArchive::~ICsvArchive()
- {
- }
- void hadoop::OCsvArchive::serialize(int8_t t, const char* tag)
- {
- printCommaUnlessFirst();
- char sval[5];
- sprintf(sval, "%d", t);
- stream.write(sval, strlen(sval));
- }
- void hadoop::OCsvArchive::serialize(bool t, const char* tag)
- {
- printCommaUnlessFirst();
- const char *sval = t ? "T" : "F";
- stream.write(sval,1);
- }
- void hadoop::OCsvArchive::serialize(int32_t t, const char* tag)
- {
- printCommaUnlessFirst();
- char sval[128];
- sprintf(sval, "%d", t);
- stream.write(sval, strlen(sval));
- }
- void hadoop::OCsvArchive::serialize(int64_t t, const char* tag)
- {
- printCommaUnlessFirst();
- char sval[128];
- sprintf(sval, "%lld", t);
- stream.write(sval, strlen(sval));
- }
- void hadoop::OCsvArchive::serialize(float t, const char* tag)
- {
- printCommaUnlessFirst();
- char sval[128];
- sprintf(sval, "%f", t);
- stream.write(sval, strlen(sval));
- }
- void hadoop::OCsvArchive::serialize(double t, const char* tag)
- {
- printCommaUnlessFirst();
- char sval[128];
- sprintf(sval, "%lf", t);
- stream.write(sval, strlen(sval));
- }
- void hadoop::OCsvArchive::serialize(const std::string& t, const char* tag)
- {
- printCommaUnlessFirst();
- stream.write("'",1);
- int len = t.length();
- for (int idx = 0; idx < len; idx++) {
- char c = t[idx];
- switch(c) {
- case '\0':
- stream.write("%00",3);
- break;
- case 0x0A:
- stream.write("%0A",3);
- break;
- case 0x0D:
- stream.write("%0D",3);
- break;
- case 0x25:
- stream.write("%25",3);
- break;
- case 0x2C:
- stream.write("%2C",3);
- break;
- case 0x7D:
- stream.write("%7D",3);
- break;
- default:
- stream.write(&c,1);
- break;
- }
- }
- }
- void hadoop::OCsvArchive::serialize(const std::string& t, size_t len, const char* tag)
- {
- printCommaUnlessFirst();
- stream.write("#",1);
- for(int idx = 0; idx < len; idx++) {
- uint8_t b = t[idx];
- char sval[3];
- sprintf(sval,"%2x",b);
- stream.write(sval, 2);
- }
- }
- void hadoop::OCsvArchive::startRecord(const Record& s, const char* tag)
- {
- printCommaUnlessFirst();
- if (tag != NULL && strlen(tag) != 0) {
- stream.write("s{",2);
- }
- isFirst = true;
- }
- void hadoop::OCsvArchive::endRecord(const Record& s, const char* tag)
- {
- if (tag == NULL || strlen(tag) == 0) {
- stream.write("\n",1);
- isFirst = true;
- } else {
- stream.write("}",1);
- isFirst = false;
- }
- }
- void hadoop::OCsvArchive::startVector(size_t len, const char* tag)
- {
- printCommaUnlessFirst();
- stream.write("v{",2);
- isFirst = true;
- }
- void hadoop::OCsvArchive::endVector(size_t len, const char* tag)
- {
- stream.write("}",1);
- isFirst = false;
- }
- void hadoop::OCsvArchive::startMap(size_t len, const char* tag)
- {
- printCommaUnlessFirst();
- stream.write("m{",2);
- isFirst = true;
- }
- void hadoop::OCsvArchive::endMap(size_t len, const char* tag)
- {
- stream.write("}",1);
- isFirst = false;
- }
- hadoop::OCsvArchive::~OCsvArchive()
- {
- }
|