tasks._pike
Go to the documentation of this file.
1 /* Copyright (C) 2000-2004 Thomas Bopp, Thorsten Hampel, Ludger Merkens
2  *
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16  *
17  * $Id: tasks.pike,v 1.2 2009/08/07 15:22:36 nicke Exp $
18  */
19 inherit "/kernel/module";
20 #include <macros.h>
21 #include <exception.h>
22 #include <attributes.h>
23 #include <classes.h>
24 #include <access.h>
25 #include <database.h>
26 #include <config.h>
27 class tasks : public module{
28 public:
29 
30 
31 
32 
33 import Task;
34 
35 
36 
37 #ifdef TASK_DEBUG
38 #define DEBUG_TASK(s, args...) write("TASKS: "+s+"\n", args)
39 #else
40 #define DEBUG_TASK(s, args...)
41 #endif
42 
43  mapping mTasks = ([ ]);
44  int tid = 0;
45  Thread.Queue taskQueue; // task Queue
46 
47 object log;
48 
49 void task_debug(string s)
50 {
51  if ( objectp(log) ) {
52  string l = log->get_content();
53  log->set_content(l+ "On "+ctime(time())+": &nbsp;&nbsp;"+ s+"<br/>");
54  }
55 }
56 
57 
58 private:
59 void init_module()
60 {
61  add_data_storage(STORE_TASKS,retrieve_tasks, restore_tasks);
62  taskQueue = Thread.Queue();
63 
64  start_thread(worker); // start 1 worker threads
65 
66 }
67 
68 public:
69 
70 void worker()
71 {
72  while ( 1 ) {
73  mixed err = catch {
74  object task = taskQueue->read();
75  string tdescribe = task->describe();
76  float tt = gauge(f_run_task(task));
77  int slow = (int)_Server->get_config("log_slow_tasks");
78  if ( slow && (int)(tt*1000.0) > slow )
79  get_module("log")->log("slow_requests", LOG_LEVEL_INFO,
80  "%s Task %s took %d ms",
81  timelib.event_time(time()), tdescribe,
82  (int)(tt*1000.0));
83  };
84  if ( err ) {
85  if ( arrayp(err) && sizeof(err) == 3 && (err[2] & E_ACCESS) ) {
86  get_module("log")->log("security", LOG_LEVEL_DEBUG, "%O\n%O",
87  err[0], err[1]);
88  }
89  else
90  FATAL("Task failed with error: %O\n%O", err[0], err[1]);
91  }
92  else {
93  DEBUG_TASK("Task succeeded !");
94  }
95  }
96 }
97 
98 private:
99 mapping retrieve_tasks()
100 {
101  if ( CALLER != _Database )
102  THROW("Caller is not database !", E_ACCESS);
103  mapping save = ([ ]);
104  foreach ( indices(mTasks), mixed idx)
105  if ( objectp(idx) ) {
106  save[idx] = ({});
107  foreach(mTasks[idx], object t ) {
108  if ( objectp(t) && !functionp(t->func) && objectp(t->obj))
109  save[idx] += ({ mkmapping(indices(t), values(t)) });
110  }
111  }
112  //werror("****** retrieve_tasks() = %O\n", save);
113  return ([ "tasks": save, "id": tid, ]);
114 }
115 
116 public:
117 
118 private:
119 void restore_tasks(mapping data)
120 {
121  if ( CALLER != _Database )
122  THROW("Caller is not database !", E_ACCESS);
123  foreach(indices(data["tasks"]), object o ) {
124  foreach(data["tasks"][o], mapping m)
125  {
126  LOG("Task="+sprintf("%O\n",m));
127  object t = add_task(o, m->obj, m->func,
128  m->params, m->descriptions);
129  }
130  }
131  tid = data->id;
132 }
133 
134 public:
135 
136 
137 /**
138  * Add a task for a user or a general task. The task will be execute when
139  * the user logs in or after a time of t.
140  *
141  * @param object|int user_t - a user task, ask the user upon login
142  * @param object obj - the object to call a function
143  * @param string func - the task function to call
144  * @param array args
145  * @return the resulting task object (see Task.pmod)
146  */
147 object
148 add_task(int|object user_t, object obj, string|function func, array args, mapping desc)
149 {
150  object task = Task();
151  task->obj = obj;
152  task->func = func;
153  task->params = args;
154  task->descriptions = desc;
155  task->tid = ++tid;
156  task->exec_time = 0;
157 
158  mTasks[tid] = task;
159 
160  // user related task
161  if ( objectp(user_t) ) {
162  if ( !arrayp(mTasks[user_t]) )
163  mTasks[user_t] = ({ });
164  mTasks[user_t] += ({ task });
165  // do not save these tasks
166  run_task(task);
167  }
168  else {
169  task->exec_time = user_t;
170  if ( task->exec_time > time() ) {
171  DEBUG_TASK("New Task running later !");
172  call(run_task, time() - task->exec_time, task);
173  }
174  else {
175  run_task(task);
176  DEBUG_TASK("New Task immediate execution !");
177  }
178  // execute immediately or after time user_t
179  }
180 
181  DEBUG_TASK("added %O (id="+task->tid+")", func);
182  require_save(STORE_TASKS);
183 
184  return task;
185 }
186 
187 array get_tasks(object user)
188 {
189  array tasks = mTasks[user] || ({ });
190  return tasks - ({ 0 });
191 }
192 
193 
194 object get_task(int tid)
195 {
196  return mTasks[tid];
197 }
198 
199 mapping _get_tasks()
200 {
201  return mTasks;
202 }
203 
204 void tasks_done(object user)
205 {
206  DEBUG_TASK("All Tasks done for "+ user->get_identifier());
207  mTasks[user] = ({ });
208  require_save(STORE_TASKS);
209 }
210 
211 protected:
212  void f_run_task(object t)
213 {
214  function f;
215  mixed err;
216 
217  DEBUG_TASK("Tasks: looking for %O\n", t->func);
218  if ( !functionp(t->func) ) {
219  if ( !objectp(t->obj) ) {
220  FATAL("Invalid Task %O\n", t->describe());
221  m_delete(mTasks, t->tid);
222  require_save(STORE_TASKS);
223  return;
224  }
225  f = t->obj->find_function(t->func);
226  }
227  else
228  f = t->func;
229 
230  if ( !functionp(f) ) {
231  FATAL("Cannot find task '"+t->func+"' to execute !");
232  return;
233  }
234  DEBUG_TASK("Running task " + t->tid + "(%O in %O) as %O\n",
235  t->func,
236  t->obj,
237  t->user);
238  DEBUG_TASK("Current user is %O", this_user());
239 
240 
241  seteuid(t->user);
242  if ( arrayp(t->params) )
243  err = catch(f(@t->params));
244  else
245  err = catch(f());
246 
247  if ( err != 0 ) {
248  FATAL( "Error while running task (%O in %O) as %O: %O\n%O",
249  t->func, t->obj, t->user, err[0], err[1] );
250  }
251 
252  DEBUG_TASK("Task " + t->tid + " success !");
253  m_delete(mTasks, t->tid);
254  err = catch(t->task_done());
255  if ( err != 0 )
256  FATAL( "Error while ending task (%O in %O) as %O: %O",
257  t->func, t->obj, t->user, err );
258 
259  require_save(STORE_TASKS);
260 }
261 
262 public:
263 
264 int run_task(int|object tid)
265 {
266  object t;
267  if ( !objectp(tid) )
268  t = mTasks[tid];
269  else
270  t = tid;
271 
272  if ( !objectp(t) ) {
273  FATAL("Unable to perform task " + tid + ": no object.");
274  return 0;
275  }
276 
277  t->user = geteuid() || this_user();
278  DEBUG_TASK("Run Task %O in %O (id=%d) as %O",
279  t->func,
280  t->obj,
281  t->tid,
282  t->user);
283 
284 
285  taskQueue->write(t);
286  return 1;
287 }
288 
289 string get_identifier() { return "tasks"; }
290 
291 void create_group_exit(object grp, object user)
292 {
293  object dest = grp->query_attribute(GROUP_WORKROOM);
294  object wr = user->query_attribute(USER_WORKROOM);
295  array exits = wr->get_inventory_by_class(CLASS_EXIT);
296 
297  // check if exit already exists in workarea
298  foreach ( exits, object ex )
299  if ( ex->get_exit() == dest )
300  return;
301 
302 
303  object factory = _Server->get_factory(CLASS_EXIT);
304  object exit = factory->execute(
305  ([ "name": grp->parent_and_group_name() + " workarea", "exit_to": dest, ]) );
306  exit->move(wr);
307 }
308 
309 void join_invited_group(object grp, object user)
310 {
311  grp->add_member(user);
312  create_group_exit(grp, user);
313 }
314 
315 void remove_group_exit(object grp, object user)
316 {
317  object wr = user->query_attribute(USER_WORKROOM);
318  if ( objectp(wr) ) {
319  foreach(wr->get_inventory_by_class(CLASS_EXIT), object exit)
320  if ( exit->get_exit() == grp->query_attribute(GROUP_WORKROOM) )
321  {
322  exit->delete();
323  return;
324  }
325  }
326 }
327 
328 
329 };