From cccca51788db48503545d32bc9c3b952cd89b687 Mon Sep 17 00:00:00 2001 From: Vitalie Lazu Date: Sat, 13 Mar 2021 08:06:53 +0200 Subject: [PATCH] pg: support for copy sql commands (#9272) --- vlib/pg/pg.v | 102 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 9 deletions(-) diff --git a/vlib/pg/pg.v b/vlib/pg/pg.v index 296b868d49..cb83d05916 100644 --- a/vlib/pg/pg.v +++ b/vlib/pg/pg.v @@ -1,11 +1,17 @@ module pg +import io + #flag -lpq #flag linux -I/usr/include/postgresql #flag darwin -I/opt/local/include/postgresql11 #flag windows -I @VROOT/thirdparty/pg/include #flag windows -L @VROOT/thirdparty/pg/win64 + +// PostgreSQL Source Code +// https://doxygen.postgresql.org/libpq-fe_8h.html #include + pub struct DB { mut: conn &C.PGconn @@ -16,7 +22,8 @@ pub mut: vals []string } -struct C.PGResult +struct C.PGResult { +} pub struct Config { pub: @@ -41,12 +48,25 @@ fn C.PQntuples(&C.PGResult) int fn C.PQnfields(&C.PGResult) int -fn C.PQexec(voidptr) &C.PGResult +fn C.PQexec(voidptr, byteptr) &C.PGResult -fn C.PQexecParams(voidptr) &C.PGResult +// Params: +// const Oid *paramTypes +// const char *const *paramValues +// const int *paramLengths +// const int *paramFormats +fn C.PQexecParams(conn voidptr, command byteptr, nParams int, paramTypes int, paramValues byteptr, paramLengths int, paramFormats int, resultFormat int) &C.PGResult + +fn C.PQputCopyData(conn voidptr, buffer byteptr, nbytes int) int + +fn C.PQputCopyEnd(voidptr, int) int + +fn C.PQgetCopyData(conn voidptr, buffer &byteptr, async int) int fn C.PQclear(&C.PGResult) voidptr +fn C.PQfreemem(voidptr) + fn C.PQfinish(voidptr) // connect makes a new connection to the database server using @@ -63,7 +83,7 @@ pub fn connect(config Config) ?DB { // We force the construction of a new string as the // error message will be freed by the next `PQfinish` // call - c_error_msg := unsafe {C.PQerrorMessage(conn).vstring()} + c_error_msg := unsafe { C.PQerrorMessage(conn).vstring() } error_msg := '$c_error_msg' C.PQfinish(conn) return error('Connection to a PG database failed: $error_msg') @@ -82,7 +102,7 @@ fn res_to_rows(res voidptr) []Row { mut row := Row{} for j in 0 .. nr_cols { val := C.PQgetvalue(res, i, j) - sval := unsafe {val.vstring()} + sval := unsafe { val.vstring() } row.vals << sval } rows << row @@ -114,7 +134,7 @@ pub fn (db DB) q_int(query string) ?int { return val.int() } -// q_int submit a command to the database server and +// q_string submit a command to the database server and // returns an the first field in the first tuple // as a string. If no row is found or on // command failure, an error is returned @@ -154,7 +174,7 @@ fn rows_first_or_empty(rows []Row) ?Row { pub fn (db DB) exec_one(query string) ?Row { res := C.PQexec(db.conn, query.str) - e := unsafe {C.PQerrorMessage(db.conn).vstring()} + e := unsafe { C.PQerrorMessage(db.conn).vstring() } if e != '' { return error('pg exec error: "$e"') } @@ -164,7 +184,7 @@ pub fn (db DB) exec_one(query string) ?Row { // exec_param_many executes a query with the provided parameters pub fn (db DB) exec_param_many(query string, params []string) ?[]Row { - mut param_vals := []charptr{ len: params.len } + mut param_vals := []charptr{len: params.len} for i in 0 .. params.len { param_vals[i] = params[i].str } @@ -182,10 +202,74 @@ pub fn (db DB) exec_param(query string, param string) ?[]Row { } fn (db DB) handle_error_or_result(res voidptr, elabel string) ?[]Row { - e := unsafe {C.PQerrorMessage(db.conn).vstring()} + e := unsafe { C.PQerrorMessage(db.conn).vstring() } if e != '' { C.PQclear(res) return error('pg $elabel error:\n$e') } return res_to_rows(res) } + +// copy_expert execute COPY commands +// https://www.postgresql.org/docs/9.5/libpq-copy.html +pub fn (db DB) copy_expert(query string, file io.ReaderWriter) ?int { + res := C.PQexec(db.conn, query.str) + status := C.PQresultStatus(res) + + defer { + C.PQclear(res) + } + + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg copy error:\n$e') + } + + if status == C.PGRES_COPY_IN { + mut buf := []byte{len: 4 * 1024} + for { + n := file.read(mut buf) or { + msg := 'pg copy error: Failed to read from input' + C.PQputCopyEnd(db.conn, msg.str) + return err + } + if n <= 0 { + break + } + + code := C.PQputCopyData(db.conn, buf.data, n) + if code == -1 { + return error('pg copy error: Failed to send data, code=$code') + } + } + + code := C.PQputCopyEnd(db.conn, 0) + + if code != 1 { + return error('pg copy error: Failed to finish copy command, code: $code') + } + } else if status == C.PGRES_COPY_OUT { + for { + address := byteptr(0) + n_bytes := C.PQgetCopyData(db.conn, &address, 0) + if n_bytes > 0 { + mut local_buf := []byte{len: n_bytes} + unsafe { C.memcpy(byteptr(local_buf.data), address, n_bytes) } + file.write(local_buf) or { + C.PQfreemem(address) + return err + } + } else if n_bytes == -1 { + break + } else if n_bytes == -2 { + // consult PQerrorMessage for the reason + return error('pg copy error: read error') + } + if address != 0 { + C.PQfreemem(address) + } + } + } + + return 0 +}