diff options
author | Leah Neukirchen <leah@vuxu.org> | 2023-10-15 19:17:27 +0200 |
---|---|---|
committer | Leah Neukirchen <leah@vuxu.org> | 2023-10-15 19:17:27 +0200 |
commit | f9ad993a1586436a539721f84058bf63055c8ac5 (patch) | |
tree | 18dd0d72901e83d70c12bb43a3b5c22a53935945 /mico-store.c | |
parent | 9ed51426baac47f3fe4b45cc7a018b9f6eeff2dd (diff) | |
download | mico-f9ad993a1586436a539721f84058bf63055c8ac5.tar.gz mico-f9ad993a1586436a539721f84058bf63055c8ac5.tar.xz mico-f9ad993a1586436a539721f84058bf63055c8ac5.zip |
wip
Diffstat (limited to 'mico-store.c')
-rw-r--r-- | mico-store.c | 231 |
1 files changed, 119 insertions, 112 deletions
diff --git a/mico-store.c b/mico-store.c index fec55bd..8bccad1 100644 --- a/mico-store.c +++ b/mico-store.c @@ -1,3 +1,4 @@ +#include <arpa/inet.h> #include <assert.h> #include <stdint.h> #include <stdio.h> @@ -8,6 +9,8 @@ #define STB_DS_IMPLEMENTATION #include "stb_ds.h" +#define MICO_HEADER "\211MiC\r\n\032\n" + struct event { int64_t t; double v; @@ -38,6 +41,7 @@ struct bitfile { FILE *output; char *mem; size_t memlen; + size_t nevents; uint64_t bitbuf; // value of bits in write buffer int bitcount; // number of bits in write buffer @@ -104,6 +108,94 @@ put1(struct bitfile *bf, int64_t v) putbits1_msb(bf, 32, v & 0xffffffff); } +static struct bitfile ts, vs; +char *filename; +zip_t *zip; + +void +reset_stream() +{ + printf("RESET\n"); + + ts = (struct bitfile){ 0 }; + vs = (struct bitfile){ 0 }; + ts.output = open_memstream(&ts.mem, &ts.memlen); + vs.output = open_memstream(&vs.mem, &vs.memlen); + + fwrite(MICO_HEADER, 1, 8, ts.output); + fwrite(MICO_HEADER, 1, 8, vs.output); + + putbits1_msb(&ts, 32, 0); + putbits1_msb(&ts, 32, 0); + putbits1_msb(&vs, 32, 0); + putbits1_msb(&vs, 32, 0); +} + +void +write_stream(char *name) +{ + putbits1_flush(&ts); + putbits1_flush(&vs); + + char prefix[255]; + char path[255]; + strcpy(prefix, name); + char *s = strchr(prefix, '{'); + if (!s) { + abort(); + } + *s = '/'; + s = strrchr(prefix, '}'); + if (s) + *s = 0; + + fprintf(stderr, "%s: %ld + %ld\n", name, ts.memlen, vs.memlen); + fprintf(stderr, "name=%s\n", path); + + /* patch in length */ + uint32_t h = htonl((uint32_t)(ts.nevents >> 32)); + uint32_t l = htonl((uint32_t)(ts.nevents & 0xffffffff)); + memcpy(ts.mem + 8, &h, sizeof h); + memcpy(ts.mem + 8 + sizeof h, &l, sizeof l); + memcpy(vs.mem + 8, &h, sizeof h); + memcpy(vs.mem + 8 + sizeof h, &l, sizeof l); + + // time_t mtime = metrics[m].value[0].t / 1000; + snprintf(path, sizeof path, "%s/time.dd", prefix); + + zip_source_t *buft = zip_source_buffer(zip, + ts.mem, ts.memlen, 0); + int jj= zip_file_add(zip, path, buft, ZIP_FL_ENC_UTF_8); + printf("index of %s = %d\n", path, jj); + if (jj < 0) { + printf("error adding file: %s\n", zip_strerror(zip)); + } + // zip_file_set_mtime(zip, jj, mtime, 0); + + snprintf(path, sizeof path, "%s/data.d", prefix); + + zip_source_t *bufv = zip_source_buffer(zip, + vs.mem, vs.memlen, 0); + jj= zip_file_add(zip, path, bufv, ZIP_FL_ENC_UTF_8); + printf("index of %s = %d\n", path, jj); + if (jj < 0) { + printf("error adding file: %s\n", zip_strerror(zip)); + } + // zip_file_set_mtime(zip, jj, mtime, 0); + + // note that data must be kept in memory until zip_close + // actually writes the archive. + + // close and reopen the zip to force write and free all buffers + zip_close(zip); + zip = zip_open(filename, 0, 0); + if (!zip) + exit(-1); + + free(ts.mem); + free(vs.mem); +} + int main(int argc, char *argv[]) { @@ -115,23 +207,29 @@ main(int argc, char *argv[]) char *line = 0; size_t linebuflen = 0; ssize_t linelen = 0; - int i = 0; if (argc < 2) { fprintf(stderr, "usage: %s output.zip\n", argv[0]); exit(-1); } + filename = argv[1]; + zip = zip_open(filename, ZIP_CREATE | ZIP_TRUNCATE, 0); + if (!zip) + exit(-1); + + int64_t t, pt = 0, ppt = 0; + double v, pv = 0.0; + while ((linelen = getdelim(&line, &linebuflen, '\n', stdin)) >= 0) { char *start = strchr(line, ' '); if (!start) break; char *start2 = strchr(start+1, ' '); if (!start2) break; - struct event ev; char *eend; - ev.v = strtod(start, &eend); - ev.t = strtoll(eend+1, 0, 10); + v = strtod(start, &eend); + t = strtoll(eend+1, 0, 10); if (start - line > (ssize_t)sizeof name) { fprintf(stderr, "metric too long: %s\n", line); @@ -140,121 +238,30 @@ main(int argc, char *argv[]) memcpy(name, line, start-line); name[start-line] = 0; - // XXX normalize name; + printf("LINE |%s| |%s| |%s|\n", line, name, lastname); - // ts = ts / 1000; // XXX - - if (strcmp(name, lastname) == 0) { - // same name, skip hash lookup, use old i - } else { - if (shgeti(metrics, name) < 0) - shput(metrics, name, 0); - i = shgeti(metrics, name); + if (strcmp(name, lastname) != 0) { + if (*lastname) + write_stream(lastname); + reset_stream(); + pt = ppt = 0; + pv = 0.0; } - arrput(metrics[i].value, ev); + put1(&ts, (t - pt) - (pt - ppt)); + put1(&vs, (int64_t)((v - pv) * 10000)); + ts.nevents++; + vs.nevents++; - strcpy(lastname, name); - } + ppt = pt; + pt = t; + pv = v; - printf("metrics: %ld\n", shlen(metrics)); - for (int i = 0; i < shlen(metrics); i++) { - printf("%s %ld\n", - metrics[i].key, - arrlen(metrics[i].value)); + strcpy(lastname, name); } - zip_t *zip = zip_open(argv[1], ZIP_CREATE | ZIP_TRUNCATE, 0); - if (!zip) - exit(-1); - - for (int m = 0; m < shlen(metrics); m++) { - // ensure events are ordered by size - qsort(metrics[m].value, arrlen(metrics[m].value), - sizeof (struct event), timecmp); - - struct bitfile ts = { 0 }; - struct bitfile vs = { 0 }; - ts.output = open_memstream(&ts.mem, &ts.memlen); - vs.output = open_memstream(&vs.mem, &vs.memlen); - - int64_t t, pt = 0, ppt = 0; - double v, pv = 0.0; - - put1(&ts, arrlen(metrics[m].value)); - put1(&vs, arrlen(metrics[m].value)); - - for (ssize_t i = 0; i < arrlen(metrics[m].value); i++) { - t = metrics[m].value[i].t; - v = metrics[m].value[i].v; - - if (t == pt) { - fprintf(stderr, "duplicate timestamp: %ld", t); - continue; - } - - put1(&ts, (t - pt) - (pt - ppt)); - put1(&vs, (int64_t)((v - pv) * 10000)); - - ppt = pt; - pt = t; - pv = v; - } - - putbits1_flush(&ts); - putbits1_flush(&vs); - - printf("%s: %ld + %ld\n", metrics[m].key, - ts.memlen, vs.memlen); - - char prefix[255]; - char path[255]; - strcpy(prefix, metrics[m].key); - char *s = strchr(prefix, '{'); - if (!s) { - abort(); - } - *s = '/'; - s = strrchr(prefix, '}'); - if (s) - *s = 0; - - time_t mtime = metrics[m].value[0].t / 1000; - snprintf(path, sizeof path, "%s/time.dd", prefix); - - zip_source_t *buft = zip_source_buffer(zip, - ts.mem, ts.memlen, 0); - int jj= zip_file_add(zip, path, buft, ZIP_FL_ENC_UTF_8); - printf("index of %s = %d\n", path, jj); - if (jj < 0) { - printf("error adding file: %s\n", zip_strerror(zip)); - } - zip_file_set_mtime(zip, jj, mtime, 0); - - snprintf(path, sizeof path, "%s/data.d", prefix); - - zip_source_t *bufv = zip_source_buffer(zip, - vs.mem, vs.memlen, 0); - jj= zip_file_add(zip, path, bufv, ZIP_FL_ENC_UTF_8); - printf("index of %s = %d\n", path, jj); - if (jj < 0) { - printf("error adding file: %s\n", zip_strerror(zip)); - } - zip_file_set_mtime(zip, jj, mtime, 0); - - // note that data must be kept in memory until zip_close - // actually writes the archive. - - // close and reopen the zip to force write and free all buffers - zip_close(zip); - zip = zip_open(argv[1], 0, 0); - if (!zip) - exit(-1); - - arrfree(metrics[m].value); - free(ts.mem); - free(vs.mem); - } + if (*name) + write_stream(name); zip_close(zip); } |