diff options
Diffstat (limited to 'lib/sql.c')
-rw-r--r-- | lib/sql.c | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/lib/sql.c b/lib/sql.c new file mode 100644 index 0000000..8ac4dbd --- /dev/null +++ b/lib/sql.c @@ -0,0 +1,276 @@ +#include "sql.h" +#include "collect.h" +#include "extract.h" +#include "util.h" +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +static inline int _db_handle_result(sqlite3 *db, int rc, const char *errmsg, + const char *err, bool fatal) { + if (SQLITE_OK != rc) { + ERROR("sqlite3: %s(%i): %s\n", errmsg ? errmsg : "", rc, + err ? err : sqlite3_errmsg(db)); + if (fatal) { + sqlite3_close(db); + exit(1); + } + } + return rc; +} + +static inline int _db_exec(sqlite3 *db, const char *cmd, const char *errmsg, + bool fatal) { + char *err = NULL; + int rc = sqlite3_exec(db, cmd, NULL, NULL, &err); + return _db_handle_result(db, rc, errmsg, err, fatal); +} + +static inline int db_exec_fatal(sqlite3 *db, const char *cmd, + const char *errmsg) { + return _db_exec(db, cmd, errmsg, true); +} + +static inline int db_exec(sqlite3 *db, const char *cmd, const char *errmsg) { + return _db_exec(db, cmd, errmsg, false); +} + +static inline int db_prepare(sqlite3 *db, const char *cmd, const char *errmsg, + sqlite3_stmt **stmt) { + int rc = sqlite3_prepare_v2(db, cmd, -1, stmt, 0); + return _db_handle_result(db, rc, errmsg, NULL, true); +} + +int db_set_pragma(sqlite3 *db) { + return db_exec_fatal(db, + "PRAGMA journal_mode=WAL;" + "PRAGMA foreign_keys = ON;", + "Can't set Sqlite3 PRAGMA"); +} + +int db_vacuum(sqlite3 *db) { + return db_exec(db, "VACUUM", "Can't vacuum database"); +} + +int db_create_table(sqlite3 *db) { + const char *create_sql = + "CREATE TABLE IF NOT EXISTS " g_sqlite_table_data " (" + "id INTEGER PRIMARY KEY," + "data BLOB" + ");" + "CREATE TABLE IF NOT EXISTS " g_sqlite_table_header " (" + "id INTEGER PRIMARY KEY," + "nr_entries INTEGER," + "size INTEGER," + "compression_type INTEGER," + "start_time INTEGER," + "end_time INTEGER," + "data_id INTEGER," + "FOREIGN KEY(data_id) REFERENCES " g_sqlite_table_data + "(id) ON DELETE SET NULL" + ");"; + int rc = 0, retry = g_sqlite_nr_fail_retry; + while (retry--) { + rc = db_exec(db, create_sql, "Can't create table"); + if (SQLITE_LOCKED != rc && SQLITE_BUSY != rc) + return rc; + sleep(1); + } + + ERROR("Can't create table, reach max retry, bailed out!"); + exit(1); +} + +int db_open(sqlite3 **db, const char *dbname) { + int rc; + rc = sqlite3_open(dbname, db); + if (SQLITE_OK != rc) { + ERROR("Can't open database %s (%i): %s\n", dbname, rc, + sqlite3_errmsg(*db)); + exit(1); + } + + return db_set_pragma(*db); +} + +int db_close(sqlite3 *db) { + sqlite3_close(db); + return 0; +} + +int db_insert(sqlite3 *db, const Header *header, const Entry *entries) { + int rc; + sqlite3_stmt *stmt[2] = {0}; + const char *insert_sql[] = { + "INSERT INTO " g_sqlite_table_data " (data) VALUES(?)", + "INSERT INTO " g_sqlite_table_header " " + "(nr_entries, size, compression_type, start_time, end_time, data_id) " + "VALUES(?, ?, ?, ?, ?, ?)"}; + + db_exec_fatal(db, "BEGIN TRANSACTION", "db_insert: Can't begin txn"); + for (int i = 0; i < 2;) { + rc = db_prepare(db, insert_sql[i], "Can't insert data", &stmt[i]); + if (i == 0) { + printf("Inserting raw data: %02X:%02X", ((char *)entries)[0], + ((char *)entries)[1]); + sqlite3_bind_blob(stmt[i], 1, entries, header->raw_size, + SQLITE_STATIC); + } else { + sqlite3_int64 data_id = sqlite3_last_insert_rowid(db); + sqlite3_bind_int(stmt[i], 1, header->nr_entries); + sqlite3_bind_int(stmt[i], 2, header->raw_size); + sqlite3_bind_int(stmt[i], 3, header->compression_type); + sqlite3_bind_int64(stmt[i], 4, header->start_time); + sqlite3_bind_int64(stmt[i], 5, header->end_time); + sqlite3_bind_int64(stmt[i], 6, data_id); + } + + rc = sqlite3_step(stmt[i]); + if (rc != SQLITE_DONE) + WARN("sqlite3: Insert data step fail: %d\n", rc); + + if (SQLITE_SCHEMA == sqlite3_finalize(stmt[i])) + continue; + i++; + } + + DEBUG("Inserted #%d of compressed size %d", header->nr_entries, + header->raw_size); + db_exec_fatal(db, "END TRANSACTION", "db_insert: Can't end txn"); + return rc; +} + +int db_read_data_by_timerange(sqlite3 *db, const Timerange *t, + StateCallback cb) { + const char *_select_sql = + "SELECT * FROM " g_sqlite_table_header + " INNER JOIN " g_sqlite_table_data " ON " g_sqlite_table_header + ".data_id = " g_sqlite_table_data ".id" + " WHERE " g_sqlite_table_header + ".end_time > %ld AND " g_sqlite_table_header ".start_time < %ld"; + char select_sql[strlen(_select_sql) + 25]; + sprintf(select_sql, _select_sql, t->from, t->until); + + sqlite3_stmt *stmt; + db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn"); + int rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0); + if (rc != SQLITE_OK) { + ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db)); + sqlite3_close(db); + exit(1); + } + + int count = 0; + for (;; count++) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) + break; + assert(rc == SQLITE_ROW); + + State *s = malloc(sizeof(State)); + s->header = malloc(sizeof(Header)); + s->header->nr_entries = sqlite3_column_int(stmt, 1); + s->header->raw_size = sqlite3_column_int(stmt, 2); + s->header->compression_type = sqlite3_column_int(stmt, 3); + + size_t size = sqlite3_column_bytes(stmt, 8); + DEBUG("extract: nr_entries: %d " + "raw_size: %d " + "compression_type: %d " + "size: %ld", + s->header->nr_entries, s->header->raw_size, + s->header->compression_type, size); + if (size != (size_t)s->header->raw_size) + FATAL("extract: header data size and actual size not match: " + "expected: %u, got: %ld", + s->header->raw_size, size); + + bool ok = extract(s, sqlite3_column_blob(stmt, 8)); + if (ok) + cb(s, t); + state_free(s); + } + + assert(SQLITE_SCHEMA != sqlite3_finalize(stmt)); + db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn"); + + return count; +} + +int db_get_space_consumed(sqlite3 *db) { + const char *select_data_sql = "SELECT SUM(size) FROM " g_sqlite_table_data; + const char *select_header_sql = + "SELECT COUNT(*) FROM " g_sqlite_table_header; + + int size; + sqlite3_stmt *stmt = NULL; + db_prepare(db, select_data_sql, "Can't query data", &stmt); + size = sqlite3_column_int64(stmt, 1); + + db_prepare(db, select_header_sql, "Can't query data", &stmt); + size += sizeof(Header) * sqlite3_column_int64(stmt, 1); + return size; +} + +int db_delete_oldest_bytes(sqlite3 *db, int64_t bytes) { + int rc; + sqlite3_stmt *stmt; + const char *select_sql = + "SELECT size, end_time, data_id " + "FROM " g_sqlite_table_header " WHERE data_id IS NOT NULL " + "ORDER BY end_time"; + if (!bytes) + return 0; + + db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn"); + rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0); + if (rc != SQLITE_OK) { + ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db)); + sqlite3_close(db); + exit(1); + } + + int count = 0; + size_t bufsize = 1024; + char *buf = malloc(bufsize); + + while (bytes >= 0) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) + break; + assert(rc == SQLITE_ROW); + sqlite3_int64 index = sqlite3_column_int64(stmt, 2); + int size = sqlite3_column_int(stmt, 0); + + char _buf[22]; + sprintf(_buf, count ? "%lld" : ",%lld", index); + while (strlen(_buf) + strlen(buf) + 2 >= bufsize) { + bufsize *= 2; + char *__buf = malloc(bufsize); + memcpy(__buf, buf, strlen(buf) + 1); + free(buf); + buf = __buf; + } + + strcat(buf, _buf); + bytes -= size; + count++; + } + + if (!*buf) + return 0; + + const char *_delete_sql = + "DELETE FROM " g_sqlite_table_data " WHERE id in (%s);"; + char *delete_sql = malloc(strlen(_delete_sql) + strlen(buf) + 1); + sprintf(delete_sql, _delete_sql, buf); + db_exec_fatal(db, delete_sql, "Can't delete"); + DEBUG("Deleted old data, SQL: %s", delete_sql); + free(delete_sql); + free(buf); + + assert(SQLITE_SCHEMA != sqlite3_finalize(stmt)); + db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn"); + + return count; +} |