db_file._pike
Go to the documentation of this file.
1 /* Copyright (C) 2000-2007 Thomas Bopp, Thorsten Hampel, Ludger Merkens
2  *
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16  *
17  * $Id: db_file.pike,v 1.2 2009/08/07 15:22:36 nicke Exp $
18  */
19 #include <macros.h>
20 class db_file {
21 public:
22 
23 
24 
25 
26 /*
27  * /kernel/db_file
28  * this is the database file emulation, which stores a binary file
29  * in a sequence of 64k blobs in a SQL database.
30  */
31 
32 
33 private int iID;
34 private int iNextRecNbr; /* last current read */
35 private int iCurrPos; /* current position */
36 private int iMaxRecNbr; /* last rec_order block */
37 private int iMinRecNbr; /* first rec_order block */
38 private string sMode; /* read/write */
39 private string sReadBuf="";
40 private string sWriteBuf="";
41 private int iFileSize=-1; /* not set otherwise >=0 */
42 private object readRecord;
43 private int iStopReader=0;
44 private int iPrefetch=1;
45 private function fdb;
46 private int iLastAccess;
47 
48 array get_database_handle(int id)
49 {
50  return _Database->connect_db_file(id);
51 }
52 
53 void create(int ID, string mode) {
54  open(ID, mode);
55 }
56 
57 #define READ_ONCE 100
58 
59 /**
60  * open a database content with given ID, if ID 0 is given a new ID
61  * will be generated.
62  *
63  * @param int ID - (an Content ID | 0)
64  * @param string mode -
65  * 'r' open file for reading
66  * 'w' open file for writing
67  * 'a' open file for append (use with 'w')
68  * 't' truncate file at open (use with 'w')
69  * 'c' create file if it doesn't exist (use with 'w')
70  * 'x' fail if file already exist (use with 'c')
71  *
72  * How must _always_ contain exactly one 'r' or 'w'.
73  * if no ID is given, mode 'wc' is assumed
74  * 'w' assumes 'a' unless 't'
75  * 't' overrules 'a'
76  *
77  * @return On success the ID (>1) -- 0 otherwise
78  * @see Stdio.file
79  */
80 
81 int open(int ID, string mode) {
82  sMode = mode;
83  iID = ID;
84  Sql.sql_result odbResult; // db = iID >> OID_BITS;
85 
86  if (!iID)
87  sMode = "wc";
88  [fdb, iID] = get_database_handle(iID);
89 
90  // LOG("opened db_file for mode "+sMode+" with id "+iID);
91 
92  iCurrPos = 0;
93  if (search(sMode, "r")!=-1)
94  {
95  odbResult =
96  fdb()->big_query("select min(rec_order), max(rec_order) "+
97  "from doc_data where "+
98  "doc_id ="+iID);
99  array res= odbResult->fetch_row();
100  iMinRecNbr= (int) res[0];
101  iMaxRecNbr= (int) res[1]; // both 0 if FileNotFound
102  iNextRecNbr = iMinRecNbr;
103 
104  odbResult =
105  fdb()->big_query("select rec_data from doc_data where doc_id="+iID+
106  " and rec_order="+iMinRecNbr);
107  if (odbResult->num_rows()==1)
108  {
109  [sReadBuf] = odbResult->fetch_row();
110  sReadBuf = fdb()->unescape_blob(sReadBuf);
111  if (strlen(sReadBuf)<MAX_BUFLEN) // we got the complete file
112  iFileSize = strlen(sReadBuf);
113  else
114  iPrefetch = 1; // otherwise assume prefetching
115  iNextRecNbr++;
116  }
117 
118  return ID;
119  }
120  if (search(sMode, "w")==-1) // neither read nor write mode given
121  return 0;
122 
123  // Append to database, calculate next RecNbr
124  odbResult = fdb()->big_query("select max(rec_order) from "+
125  "doc_data where doc_id = "+iID);
126  if (!objectp(odbResult))
127  iNextRecNbr = -1;
128  else
129  iNextRecNbr = ((int) odbResult->fetch_row()[0])+1;
130 
131  if (search(sMode, "c")!=-1)
132  {
133  if ((search(sMode,"x")!=-1) && (iNextRecNbr != -1))
134  return 0;
135 
136  if (iNextRecNbr == -1)
137  iNextRecNbr = 0;
138  }
139 
140  if (search(sMode, "t")!=-1)
141  {
142  if (iNextRecNbr!=-1)
143  fdb()->big_query("delete from doc_data where doc_id = " + iID);
144  iNextRecNbr = 1;
145  }
146 
147  if (iNextRecNbr == -1) // 'w' without 'c' but file doesn't exist
148  return 0;
149 
150  return iID;
151 }
152 
153 private:
154 private void write_buf(string data)
155 {
156  _Database->write_into_database(iID, iNextRecNbr, data);
157  iMaxRecNbr=iNextRecNbr;
158  iNextRecNbr++;
159  iFileSize=-1;
160 }
161 
162 public:
163 
164 private:
165 private int write_buf_now(string data)
166 {
167  iFileSize=-1;
168  string line = "insert into doc_data values('"+
169  fdb()->escape_blob(data)+"', "+ iID +", "+iNextRecNbr+")";
170  iMaxRecNbr = iNextRecNbr;
171  iNextRecNbr++;
172  mixed err = catch{fdb()->big_query(line);};
173  if (err) {
174  FATAL("Fatal error while writting FILE into database: %O\n%O",err[0],err[1]);
175  }
176  return strlen(data);
177 }
178 
179 public:
180 
181 
182 
183 void flush()
184 {
185  if (search(sMode,"w")!=-1)
186  {
187  if (strlen(sWriteBuf) > 0)
188  write_buf(sWriteBuf);
189  iFileSize = (((iMaxRecNbr - iMinRecNbr)-1) * MAX_BUFLEN) +
190  strlen(sWriteBuf);
191  }
192 }
193 
194 int close(void|function close_callback)
195 {
196  if (functionp(close_callback))
197  _Database->write_into_database(iID, 0, close_callback);
198  stop_reader();}
199 
200 
201 void destroy() {
202  close();
203 }
204 
205  int sendByte = 0;
206 
207 int get_last_access() { return iLastAccess; }
208 
209 protected:
210  void check_status(object record)
211 {
212  object mlock;
213  mlock = record->fullMutex->lock();
214  if ( functionp(record->restore) ) {
215  record->restore(record);
216  record->restore = 0;
217  }
218  destruct(mlock);
219 
220 }
221 
222 public:
223 
224 string read(int|void nbytes, int|void notall)
225 {
226  array lbuf = ({});
227  mixed line;
228  int iSumLen;
229  Sql.sql_result odbData;
230 
231 
232  if ( search(sMode,"r") == -1 )
233  return 0;
234 
235  iLastAccess = time();
236 
237 
238  if (!nbytes) // all the stuff -> no queuing
239  {
240  odbData = fdb()->big_query("select rec_data from doc_data "+
241  "where doc_id="+iID+
242  " order by rec_order");
243  while (line = odbData->fetch_row())
244  lbuf += ({ fdb()->unescape_blob(line[0]) });
245  return lbuf * "";
246  }
247  else if ( !objectp(readRecord) &&
248  (iFileSize == -1 || iFileSize> MAX_BUFLEN ) && iPrefetch )
249  {
250  readRecord = _Database->read_from_database(iID,
251  iNextRecNbr,
252  iMaxRecNbr,
253  this_object());
254  }
255 
256  iSumLen = strlen(sReadBuf);
257  lbuf = ({ sReadBuf });
258  line = "";
259  while ( iSumLen < nbytes && stringp(line) )
260  {
261  if ( readRecord ) // check for Prefetched Content
262  {
263  check_status(readRecord);
264  line = readRecord->contFifo->read();
265  }
266  else if (iNextRecNbr < iMaxRecNbr) // large files + seek
267  {
268  iPrefetch = 1;
269  readRecord = _Database->read_from_database(iID,
270  iNextRecNbr,
271  iMaxRecNbr,
272  this_object());
273  check_status(readRecord);
274  line = readRecord->contFifo->read();
275  }
276  else
277  line = 0; // small files
278 
279  if ( stringp(line) )
280  {
281  lbuf += ({ line });
282  iSumLen += strlen(line);
283  }
284  else {
285  sMode = "";
286  }
287  if ( notall)
288  break;
289  }
290  sReadBuf = lbuf * "";
291 
292  if (!strlen(sReadBuf))
293  return 0;
294 
295  if (strlen(sReadBuf) <= nbytes) // eof or notall
296  {
297  line = sReadBuf;
298  sReadBuf = "";
299  iCurrPos += strlen(line);
300  sendByte += strlen(line);
301 
302  return line;
303  }
304  line = sReadBuf[..nbytes-1];
305  sReadBuf = sReadBuf[nbytes..];
306  iCurrPos += strlen(line);
307  sendByte += strlen(line);
308  return line;
309 }
310 
311 int write_now(string data)
312 {
313  int written = low_write(data, write_buf_now);
314  written += write_buf_now(sWriteBuf);
315  return written;
316 }
317 
318 int write(string data)
319 {
320  return low_write(data, write_buf);
321 }
322 
323 protected:
324  int low_write(string data, function write_buf) {
325  int iWritten = 0;
326 
327  if (search(sMode, "w")==-1)
328  return -1;
329 
330  sWriteBuf += data;
331  while (strlen(sWriteBuf) >= MAX_BUFLEN)
332  {
333  write_buf(sWriteBuf[..MAX_BUFLEN-1]);
334  sWriteBuf = sWriteBuf[MAX_BUFLEN..];
335  iWritten += MAX_BUFLEN;
336  }
337  iCurrPos += iWritten;
338  return iWritten;
339 }
340 
341 public:
342 
343 object stat()
344 {
345  object s = Stdio.Stat();
346  s->size = sizeof();
347  return s;
348 }
349 
350 int sizeof() {
351 
352  if (iFileSize!=-1) // already calculated
353  return iFileSize;
354 
355  Sql.sql_result res;
356  int iLastChunkLen;
357 
358  if (search(sMode, "w")!=-1)
359  {
360  int erg;
361  iLastChunkLen = strlen(sWriteBuf);
362 
363  erg = ((iMaxRecNbr-iMinRecNbr) * MAX_BUFLEN) + iLastChunkLen;
364  return erg;
365  }
366  else
367  {
368  res = fdb()->big_query(
369  "select length(rec_data) from doc_data "+
370  "where doc_id ="+iID+" and rec_order="+iMaxRecNbr);
371 
372  mixed row = res->fetch_row();
373  if ( arrayp(row) )
374  iLastChunkLen = ((int)row[0]);
375  else
376  iLastChunkLen = 0;
377  }
378 
379  iFileSize = ((iMaxRecNbr-iMinRecNbr) * MAX_BUFLEN) + iLastChunkLen;
380  return iFileSize;
381 }
382 
383 int dbContID() {
384  return iID;
385 }
386 
387 
388 private:
389 private void stop_reader()
390 {
391  if ( objectp(readRecord) ) {
392  //werror("Trying to stop read operation on "+ readRecord->iID+"\n");
393  readRecord->stopRead = 1;
394  }
395 }
396 
397 public:
398 
399 /**
400  * seek in an already open database content to a specific offset
401  * If pos is negative it will be relative to the start of the file,
402  * otherwise it will be an absolute offset from the start of the file.
403  *
404  * @param int pos - the position as described above
405  * @return The absolute new offset or -1 on failure
406  * @see tell
407  *
408  * @caveats The old syntax from Stdio.File->seek with blocks is not
409  * supported
410  */
411 
412 int seek(int pos)
413 {
414  int SeekBlock;
415  int SeekPos;
416  Sql.sql_result odbResult;
417  iPrefetch = 0;
418 
419  //werror("Seek(%d)\n", pos);
420 
421  if (pos<0)
422  SeekPos = iCurrPos-SeekPos;
423  else
424  SeekPos = pos;
425  SeekBlock = SeekPos / MAX_BUFLEN;
426  SeekBlock += iMinRecNbr;
427 
428  stop_reader(); // discard prefetch and stop read_thread
429  odbResult = fdb()->big_query("select rec_data from doc_data where doc_id="+
430  iID+" and rec_order="+SeekBlock);
431  if (odbResult->num_rows()==1)
432  {
433  [sReadBuf] = odbResult->fetch_row();
434  sReadBuf = sReadBuf[(SeekPos % iMinRecNbr)..];
435  iCurrPos = SeekPos;
436  iNextRecNbr = SeekBlock+1;
437  return iCurrPos;
438  }
439  return -1;
440 }
441 
442 /**
443  * tell the current offset in an already open database content
444  * @return The absolute offset
445  */
446 
447 int tell()
448 {
449  return iCurrPos;
450 }
451 
452 string _sprintf()
453 {
454  return "kernel/db_file(id="+iID+", stopped="+iStopReader+")";
455 }
456 
457 
458 };