diff options
-rw-r--r-- | mico-dump.c | 52 | ||||
-rw-r--r-- | mico-store.c | 231 |
2 files changed, 165 insertions, 118 deletions
diff --git a/mico-dump.c b/mico-dump.c index cd80609..467db45 100644 --- a/mico-dump.c +++ b/mico-dump.c @@ -1,3 +1,4 @@ +#include <arpa/inet.h> #include <assert.h> #include <stdint.h> #include <stdio.h> @@ -6,6 +7,8 @@ #include <zip.h> +#define MICO_HEADER "\211MiC\r\n\032\n" + zip_t *zip; struct bitreader { @@ -81,6 +84,8 @@ get1(struct bitreader *input) return v; } +#define ntohll(x) ((1==ntohl(1)) ? (x) : ((uint64_t)ntohl((x) & 0xFFFFFFFF) << 32) | ntohl((x) >> 32)) + int main(int argc, char *argv[]) { @@ -108,18 +113,53 @@ main(int argc, char *argv[]) *s++ = '}'; *s = 0; - int64_t t = 0, td = 0; - double v = 0.0; + char header[8]; + uint64_t len, lenv; + int ret = zip_fread(ts.input, &header, sizeof header); + if (ret < 0) { + printf("err: %s", zip_strerror(zip)); + } + + if (memcmp(header, MICO_HEADER, 8) != 0) { + printf("wrong header\n"); + continue; + } + + ret = zip_fread(ts.input, &len, sizeof len); + if (ret < 0) { + printf("err: %s", zip_strerror(zip)); + } + + len = ntohll(len); - int64_t len = get1(&ts); - int64_t lenv = get1(&vs); + ret = zip_fread(vs.input, &header, sizeof header); + if (ret < 0) { + printf("err: %s", zip_strerror(zip)); + } + + if (memcmp(header, MICO_HEADER, 8) != 0) { + printf("wrong header\n"); + continue; + } + + ret = zip_fread(vs.input, &lenv, sizeof lenv); + if (ret < 0) { + printf("err: %s", zip_strerror(zip)); + } + + lenv = ntohll(lenv); + + printf("len=%ld lenv=%d\n", len, lenv); if (len != lenv) { - fprintf(stderr, "time and value length don't agree"); + fprintf(stderr, "time and value length don't agree\n"); exit(-1); } - for (int64_t j = 0; j < len; j++) { + int64_t t = 0, td = 0; + double v = 0.0; + + for (uint64_t j = 0; j < len; j++) { int64_t tdd = get1(&ts); int64_t vd = get1(&vs); diff --git a/mico-store.c b/mico-store.c index fec55bd..8bccad1 100644 --- a/mico-store.c +++ b/mico-store.c @@ -1,3 +1,4 @@ +#include <arpa/inet.h> #include <assert.h> #include <stdint.h> #include <stdio.h> @@ -8,6 +9,8 @@ #define STB_DS_IMPLEMENTATION #include "stb_ds.h" +#define MICO_HEADER "\211MiC\r\n\032\n" + struct event { int64_t t; double v; @@ -38,6 +41,7 @@ struct bitfile { FILE *output; char *mem; size_t memlen; + size_t nevents; uint64_t bitbuf; // value of bits in write buffer int bitcount; // number of bits in write buffer @@ -104,6 +108,94 @@ put1(struct bitfile *bf, int64_t v) putbits1_msb(bf, 32, v & 0xffffffff); } +static struct bitfile ts, vs; +char *filename; +zip_t *zip; + +void +reset_stream() +{ + printf("RESET\n"); + + ts = (struct bitfile){ 0 }; + vs = (struct bitfile){ 0 }; + ts.output = open_memstream(&ts.mem, &ts.memlen); + vs.output = open_memstream(&vs.mem, &vs.memlen); + + fwrite(MICO_HEADER, 1, 8, ts.output); + fwrite(MICO_HEADER, 1, 8, vs.output); + + putbits1_msb(&ts, 32, 0); + putbits1_msb(&ts, 32, 0); + putbits1_msb(&vs, 32, 0); + putbits1_msb(&vs, 32, 0); +} + +void +write_stream(char *name) +{ + putbits1_flush(&ts); + putbits1_flush(&vs); + + char prefix[255]; + char path[255]; + strcpy(prefix, name); + char *s = strchr(prefix, '{'); + if (!s) { + abort(); + } + *s = '/'; + s = strrchr(prefix, '}'); + if (s) + *s = 0; + + fprintf(stderr, "%s: %ld + %ld\n", name, ts.memlen, vs.memlen); + fprintf(stderr, "name=%s\n", path); + + /* patch in length */ + uint32_t h = htonl((uint32_t)(ts.nevents >> 32)); + uint32_t l = htonl((uint32_t)(ts.nevents & 0xffffffff)); + memcpy(ts.mem + 8, &h, sizeof h); + memcpy(ts.mem + 8 + sizeof h, &l, sizeof l); + memcpy(vs.mem + 8, &h, sizeof h); + memcpy(vs.mem + 8 + sizeof h, &l, sizeof l); + + // time_t mtime = metrics[m].value[0].t / 1000; + snprintf(path, sizeof path, "%s/time.dd", prefix); + + zip_source_t *buft = zip_source_buffer(zip, + ts.mem, ts.memlen, 0); + int jj= zip_file_add(zip, path, buft, ZIP_FL_ENC_UTF_8); + printf("index of %s = %d\n", path, jj); + if (jj < 0) { + printf("error adding file: %s\n", zip_strerror(zip)); + } + // zip_file_set_mtime(zip, jj, mtime, 0); + + snprintf(path, sizeof path, "%s/data.d", prefix); + + zip_source_t *bufv = zip_source_buffer(zip, + vs.mem, vs.memlen, 0); + jj= zip_file_add(zip, path, bufv, ZIP_FL_ENC_UTF_8); + printf("index of %s = %d\n", path, jj); + if (jj < 0) { + printf("error adding file: %s\n", zip_strerror(zip)); + } + // zip_file_set_mtime(zip, jj, mtime, 0); + + // note that data must be kept in memory until zip_close + // actually writes the archive. + + // close and reopen the zip to force write and free all buffers + zip_close(zip); + zip = zip_open(filename, 0, 0); + if (!zip) + exit(-1); + + free(ts.mem); + free(vs.mem); +} + int main(int argc, char *argv[]) { @@ -115,23 +207,29 @@ main(int argc, char *argv[]) char *line = 0; size_t linebuflen = 0; ssize_t linelen = 0; - int i = 0; if (argc < 2) { fprintf(stderr, "usage: %s output.zip\n", argv[0]); exit(-1); } + filename = argv[1]; + zip = zip_open(filename, ZIP_CREATE | ZIP_TRUNCATE, 0); + if (!zip) + exit(-1); + + int64_t t, pt = 0, ppt = 0; + double v, pv = 0.0; + while ((linelen = getdelim(&line, &linebuflen, '\n', stdin)) >= 0) { char *start = strchr(line, ' '); if (!start) break; char *start2 = strchr(start+1, ' '); if (!start2) break; - struct event ev; char *eend; - ev.v = strtod(start, &eend); - ev.t = strtoll(eend+1, 0, 10); + v = strtod(start, &eend); + t = strtoll(eend+1, 0, 10); if (start - line > (ssize_t)sizeof name) { fprintf(stderr, "metric too long: %s\n", line); @@ -140,121 +238,30 @@ main(int argc, char *argv[]) memcpy(name, line, start-line); name[start-line] = 0; - // XXX normalize name; + printf("LINE |%s| |%s| |%s|\n", line, name, lastname); - // ts = ts / 1000; // XXX - - if (strcmp(name, lastname) == 0) { - // same name, skip hash lookup, use old i - } else { - if (shgeti(metrics, name) < 0) - shput(metrics, name, 0); - i = shgeti(metrics, name); + if (strcmp(name, lastname) != 0) { + if (*lastname) + write_stream(lastname); + reset_stream(); + pt = ppt = 0; + pv = 0.0; } - arrput(metrics[i].value, ev); + put1(&ts, (t - pt) - (pt - ppt)); + put1(&vs, (int64_t)((v - pv) * 10000)); + ts.nevents++; + vs.nevents++; - strcpy(lastname, name); - } + ppt = pt; + pt = t; + pv = v; - printf("metrics: %ld\n", shlen(metrics)); - for (int i = 0; i < shlen(metrics); i++) { - printf("%s %ld\n", - metrics[i].key, - arrlen(metrics[i].value)); + strcpy(lastname, name); } - zip_t *zip = zip_open(argv[1], ZIP_CREATE | ZIP_TRUNCATE, 0); - if (!zip) - exit(-1); - - for (int m = 0; m < shlen(metrics); m++) { - // ensure events are ordered by size - qsort(metrics[m].value, arrlen(metrics[m].value), - sizeof (struct event), timecmp); - - struct bitfile ts = { 0 }; - struct bitfile vs = { 0 }; - ts.output = open_memstream(&ts.mem, &ts.memlen); - vs.output = open_memstream(&vs.mem, &vs.memlen); - - int64_t t, pt = 0, ppt = 0; - double v, pv = 0.0; - - put1(&ts, arrlen(metrics[m].value)); - put1(&vs, arrlen(metrics[m].value)); - - for (ssize_t i = 0; i < arrlen(metrics[m].value); i++) { - t = metrics[m].value[i].t; - v = metrics[m].value[i].v; - - if (t == pt) { - fprintf(stderr, "duplicate timestamp: %ld", t); - continue; - } - - put1(&ts, (t - pt) - (pt - ppt)); - put1(&vs, (int64_t)((v - pv) * 10000)); - - ppt = pt; - pt = t; - pv = v; - } - - putbits1_flush(&ts); - putbits1_flush(&vs); - - printf("%s: %ld + %ld\n", metrics[m].key, - ts.memlen, vs.memlen); - - char prefix[255]; - char path[255]; - strcpy(prefix, metrics[m].key); - char *s = strchr(prefix, '{'); - if (!s) { - abort(); - } - *s = '/'; - s = strrchr(prefix, '}'); - if (s) - *s = 0; - - time_t mtime = metrics[m].value[0].t / 1000; - snprintf(path, sizeof path, "%s/time.dd", prefix); - - zip_source_t *buft = zip_source_buffer(zip, - ts.mem, ts.memlen, 0); - int jj= zip_file_add(zip, path, buft, ZIP_FL_ENC_UTF_8); - printf("index of %s = %d\n", path, jj); - if (jj < 0) { - printf("error adding file: %s\n", zip_strerror(zip)); - } - zip_file_set_mtime(zip, jj, mtime, 0); - - snprintf(path, sizeof path, "%s/data.d", prefix); - - zip_source_t *bufv = zip_source_buffer(zip, - vs.mem, vs.memlen, 0); - jj= zip_file_add(zip, path, bufv, ZIP_FL_ENC_UTF_8); - printf("index of %s = %d\n", path, jj); - if (jj < 0) { - printf("error adding file: %s\n", zip_strerror(zip)); - } - zip_file_set_mtime(zip, jj, mtime, 0); - - // note that data must be kept in memory until zip_close - // actually writes the archive. - - // close and reopen the zip to force write and free all buffers - zip_close(zip); - zip = zip_open(argv[1], 0, 0); - if (!zip) - exit(-1); - - arrfree(metrics[m].value); - free(ts.mem); - free(vs.mem); - } + if (*name) + write_stream(name); zip_close(zip); } |