ledger_his.js 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. 'use strict';
  2. /**
  3. *
  4. *
  5. * @author Mai
  6. * @date
  7. * @version
  8. */
  9. const fs = require('fs');
  10. const path = require('path');
  11. var util = require('util');
  12. var logPath = path.join(__dirname, 'update_revise.log');
  13. var logFile = fs.createWriteStream(logPath, { flags: 'a' });
  14. console.log = function() {
  15. logFile.write(util.format.apply(null, arguments) + '\n');
  16. process.stdout.write(util.format.apply(null, arguments) + '\n');
  17. };
  18. const audit = require('../app/const/audit');
  19. const measureType = require('../app/const/message_type');
  20. const mysql = require('mysql');
  21. const oss = require('ali-oss');
  22. const config = process.argv.splice(2)[0];
  23. if (['local', 'uat', 'default'].indexOf(config) < 0) throw `参数错误: ${config}`;
  24. const options = require(`../config/config.${config}`)({ baseDir: __dirname + '/app', root: __dirname, name: 'calc' });
  25. const pool = mysql.createPool(options.mysql.client);
  26. const ossOption = {
  27. bucket: options.oss.clients.his.bucket,
  28. accessKeyId: options.oss.default.accessKeyId,
  29. accessKeySecret: options.oss.default.accessKeySecret,
  30. endpoint: options.oss.default.endpoint,
  31. timeout: options.oss.default.timeout,
  32. };
  33. const ossClient = new oss(ossOption);
  34. const querySql = async function(sql, sqlParam) {
  35. return new Promise(function(resolve, reject) {
  36. pool.getConnection(function(err, conn) {
  37. if (err) {
  38. if (err) console.log(err);
  39. reject(err);
  40. } else {
  41. conn.query(sql, sqlParam, function(err, rows, fields) {
  42. if (err) console.log(err);
  43. // 释放连接
  44. conn.release();
  45. // 传递Promise回调对象
  46. resolve(rows);
  47. });
  48. }
  49. });
  50. });
  51. };
  52. const getTableName = function(tender, table) {
  53. // switch(table) {
  54. // case 'ledger': return 'zh_ledger_' + (tender.id % 10);
  55. // case 'pos': return 'zh_pos_' + (tender.id % 20);
  56. // case 'revise_bills': return 'zh_revise_bills_' + (tender.id % 10);
  57. // case 'revise_pos': return 'zh_revise_pos_' + (tender.id % 20);
  58. // }
  59. return table.indexOf('pos') > 0 ? `zh_${table}_${tender.id % 20}` : `zh_${table}_${tender.id % 10}`;
  60. };
  61. const saveLedgerHis = async function(tender) {
  62. const now = new Date();
  63. const timestamp = (now).getTime();
  64. const billsHis = `${tender.project_id}/${tender.id}/ledger/bills${timestamp}.json`;
  65. const bills = await querySql('Select * From ?? where tender_id = ?', [getTableName(tender, 'ledger'), tender.id]);
  66. console.log(`saveBillsFile: ${billsHis}(${bills.length})`);
  67. await ossClient.put(options.hisOssPath + billsHis, Buffer.from(JSON.stringify(bills), 'utf8'));
  68. const posHis = `${tender.project_id}/${tender.id}/ledger/pos${timestamp}.json`;
  69. const pos = await querySql('Select * From ?? where tid = ?', [getTableName(tender, 'pos'), tender.id]);
  70. console.log(`savePosFile: ${posHis}(${pos.length})`);
  71. await ossClient.put(options.hisOssPath + posHis, Buffer.from(JSON.stringify(pos), 'utf8'));
  72. const result = await querySql('Insert Into zh_ledger_history(pid, tid, in_time, bills_file, pos_file) Values(?, ?, ?, ?, ?)',
  73. [tender.project_id, tender.id, now, billsHis, posHis]);
  74. if (!result) throw 'err';
  75. return result.insertId;
  76. };
  77. const saveReviseLedgerHis = async function (tender, revise) {
  78. const now = new Date();
  79. const billsHis = revise.bills_file;
  80. const posHis = revise.pos_file;
  81. const result = await querySql('Insert Into zh_ledger_history(pid, tid, in_time, bills_file, pos_file, rid, rorder, valid) Values(?, ?, ?, ?, ?, ?, ?, ?)',
  82. [tender.project_id, tender.id, now, billsHis, posHis, revise.id, revise.corder, revise.valid]);
  83. if (!result) throw 'err';
  84. return result.insertId;
  85. };
  86. const doCompleteTender = async function(t) {
  87. const revise = await querySql('Select * From zh_ledger_revise where tid = ? order By in_time asc', [t.id]);
  88. if (revise.length > 0) {
  89. let his_id, withoutHisRevise = [];
  90. for (const r of revise) {
  91. const preRevise = revise.find(x => { return x.status === audit.revise.status.checked && x.corder === r.corder - 1});
  92. if (preRevise) {
  93. await querySql('Update zh_ledger_revise Set pre_his_id = ? Where id = ?', [preRevise.his_id, r.id]);
  94. }
  95. if (r.bills_file) {
  96. r.his_id = await saveReviseLedgerHis(t, r);
  97. if (r.valid) his_id = r.his_id;
  98. await querySql('Update zh_ledger_revise Set his_id = ? Where id = ?', [his_id, r.id]);
  99. } else {
  100. if (r.status !== 1) withoutHisRevise.push(r);
  101. }
  102. }
  103. if (!his_id || t.measure_type === measureType.gcl.value) his_id = await saveLedgerHis(t);
  104. await querySql('Update zh_tender Set his_id = ? Where id = ?', [his_id, t.id]);
  105. const stages = await querySql('Select * From zh_stage where tid = ? and status = ?', [t.id, audit.stage.status.checked]);
  106. for (const s of stages) {
  107. if (s.status === audit.stage.status.checked) {
  108. await querySql('Update zh_stage Set his_id = ? Where id = ?', [his_id, s.id]);
  109. }
  110. }
  111. for (const r of withoutHisRevise) {
  112. await querySql('Update zh_ledger_revise Set pre_his_id = ?, his_id = ? Where id = ?', [his_id, his_id, r.id]);
  113. }
  114. } else {
  115. const ledgerHis = await saveLedgerHis(t);
  116. console.log(`saveLedgerHis: ${t.name}(${t.id})`);
  117. await querySql('Update zh_tender Set his_id = ? Where id = ?', [ledgerHis, t.id]);
  118. const stages = await querySql('Select * From zh_stage where tid = ? and status = ?', [t.id, audit.stage.status.checked]);
  119. for (const s of stages) {
  120. if (s.status === audit.stage.status.checked) {
  121. await querySql('Update zh_stage Set his_id = ? Where id = ?', [ledgerHis, s.id]);
  122. }
  123. }
  124. }
  125. };
  126. const doComplete = async function() {
  127. try {
  128. const tenders = await querySql('Select * From zh_tender where ledger_status <> ?', [audit.ledger.status.uncheck]);
  129. for (const t of tenders) {
  130. await doCompleteTender(t);
  131. }
  132. } catch (err) {
  133. console.log(err);
  134. }
  135. pool.end();
  136. };
  137. doComplete();
  138. // const doCompleteTender2 = async function(t) {
  139. // const revise = await querySql('Select * From zh_ledger_revise where tid = ? order By in_time asc', [t.id]);
  140. // if (revise.length > 0) {
  141. // for (const r of revise) {
  142. // const preRevise = revise.find(x => { return x.status === audit.revise.status.checked && x.corder === r.corder - 1});
  143. // if (!preRevise && !r.pre_his_id) {
  144. // await querySql('Update zh_ledger_revise Set pre_his_id = ? Where id = ?', [r.his_id, r.id]);
  145. // }
  146. // }
  147. // }
  148. // };
  149. //
  150. // const doCompeletTest = async function() {
  151. // try {
  152. // const tenders = await querySql('Select * From zh_tender where ledger_status <> ?', [audit.ledger.status.uncheck]);
  153. // //const tenders = await querySql('Select * From zh_tender where id = ?', [3863]);
  154. // for (const t of tenders) {
  155. // await doCompleteTender2(t);
  156. // }
  157. // } catch (err) {
  158. // console.log(err);
  159. // }
  160. // pool.end();
  161. // };
  162. // doCompeletTest();