Planeshift
genrefqueue.h
Go to the documentation of this file.
1 /*
2 * genqueue.h by Matze Braun <MatzeBraun@gmx.de>
3 *
4 * Copyright (C) 2001 Atomic Blue (info@planeshift.it, http://www.atomicblue.org)
5 *
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License
9 * as published by the Free Software Foundation (version 2 of the License)
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 *
18 */
19 
20 #ifndef __GENQUEUE_H__
21 #define __GENQUEUE_H__
22 
23 #include <csutil/ref.h>
24 #include <csutil/threading/mutex.h>
25 #include <csutil/threading/condition.h>
26 
27 #include "util/pserror.h"
28 
38 template <class queuetype, template <class T> class refType = csRef >
40 {
41 public:
42  GenericRefQueue(unsigned int maxsize = 500)
43  {
44  /* we make the buffer 1 typ bigger, so we can avoid one check and one
45  variable when testing if buffer is full */
46  qsize = maxsize;
47  qbuffer = new refType<queuetype>[qsize + 1]();
48  qstart = qend = 0;
49  }
50 
52  {
53  delete[] qbuffer;
54  }
55 
58  bool AddWait(queuetype* msg, csTicks timeout = 0)
59  {
60  // is there's a space in the queue left just add it
61  CS::Threading::RecursiveMutexScopedLock lock(mutex);
62  while(true)
63  {
64  bool added = Add(msg);
65  if (added)
66  {
67  return true;
68  }
69  Error1("Queue full! Waiting.\n");
70 
71  // Wait release mutex before waiting so that it is possible to
72  // add new messages.
73  if (!datacondition.Wait(mutex, timeout))
74  {
75  // Timed out waiting for new message
76  return false;
77  }
78  }
79  }
80 
82  bool Add(queuetype* msg)
83  {
84  unsigned int tqend;
85 
86  CS::Threading::RecursiveMutexScopedLock lock(mutex);
87 
88  if (!msg->GetPending())
89  {
90  tqend = (qend + 1) % qsize;
91  // check if queue is full
92  if (tqend == qstart)
93  {
94  Interrupt();
95  return false;
96  }
97  // check are we having a refcount race (in which msg would already be destroyed)
98  CS_ASSERT(msg->GetRefCount() > 0);
99  // add Message to queue
100  qbuffer[qend]=msg;
101  qend=tqend;
102 
103  msg->SetPending(true);
104 
105  Interrupt();
106  }
107  return true;
108  }
109 
110  // Peeks at the next message from the queue but does not remove it.
111  csPtr<queuetype> Peek()
112  {
113  CS::Threading::RecursiveMutexScopedLock lock(mutex);
114 
115  csRef<queuetype> ptr;
116 
117  unsigned int qpointer = qstart;
118 
119  // if this is a weakref queue we should skip over null entries
120  while(!ptr.IsValid())
121  {
122  // check if queue is empty
123  if (qpointer == qend)
124  {
125  return 0;
126  }
127 
128  // removes Message from queue
129  ptr = qbuffer[qpointer];
130 
131  qpointer = (qpointer + 1) % qsize;
132  }
133 
134  return csPtr<queuetype>(ptr);
135  }
136 
142  csPtr<queuetype> Get()
143  {
144  CS::Threading::RecursiveMutexScopedLock lock(mutex);
145 
146  csRef<queuetype> ptr;
147 
148  // if this is a weakref queue we should skip over null entries
149  while(!ptr.IsValid())
150  {
151  // check if queue is empty
152  if (qstart == qend)
153  {
154  Interrupt();
155  return 0;
156  }
157 
158  // removes Message from queue
159  ptr = qbuffer[qstart];
160  qbuffer[qstart] = 0;
161 
162  qstart = (qstart + 1) % qsize;
163  }
164 
165  ptr->SetPending(false);
166  Interrupt();
167 
168  return csPtr<queuetype>(ptr);
169  }
170 
172  csPtr<queuetype> GetWait(csTicks timeout)
173  {
174  // is there's a message in the queue left just return it
175  CS::Threading::RecursiveMutexScopedLock lock(mutex);
176  while(true)
177  {
178  csRef<queuetype> temp = Get();
179  if (temp)
180  {
181  return csPtr<queuetype> (temp);
182  }
183 
184  // Wait release mutex before waiting so that it is possible to
185  // add new messages.
186  if (!datacondition.Wait(mutex, timeout))
187  {
188  // Timed out waiting for new message
189  return 0;
190  }
191  }
192  CS_ASSERT(false);
193  return 0;
194  }
195 
199  void Interrupt()
200  {
201  datacondition.NotifyOne();
202  }
203 
207  unsigned int Count()
208  {
209  CS::Threading::RecursiveMutexScopedLock lock(mutex);
210  if(qend < qstart)
211  return qend + qsize - qstart;
212  else
213  return qend - qstart;
214  }
215 
216  bool IsFull()
217  {
218  CS::Threading::RecursiveMutexScopedLock lock(mutex);
219  return ((qend + 1) % qsize == qstart);
220  }
221 protected:
222 
223  refType<queuetype>* qbuffer;
224  unsigned int qstart, qend, qsize;
225  CS::Threading::RecursiveMutex mutex;
226  CS::Threading::Condition datacondition;
227 };
228 
231 #endif
refType< queuetype > * qbuffer
Definition: genrefqueue.h:223
unsigned int Count()
Number of items in the queue.
Definition: genrefqueue.h:207
unsigned int qend
Definition: genrefqueue.h:224
csPtr< queuetype > Peek()
Definition: genrefqueue.h:111
bool AddWait(queuetype *msg, csTicks timeout=0)
like above, but waits to add the next message, if the queue is full be careful with this...
Definition: genrefqueue.h:58
csPtr< queuetype > Get()
This gets the next message from the queue, it is then removed from the queue.
Definition: genrefqueue.h:142
CS::Threading::RecursiveMutex mutex
Definition: genrefqueue.h:225
bool Add(queuetype *msg)
This adds a message to the queue and waits if it is full.
Definition: genrefqueue.h:82
GenericRefQueue(unsigned int maxsize=500)
Definition: genrefqueue.h:42
A queue of smart pointers with locking facilties for multi-threading.
Definition: genrefqueue.h:39
csPtr< queuetype > GetWait(csTicks timeout)
like above, but waits for the next message, if the queue is empty
Definition: genrefqueue.h:172
void Interrupt()
This function interrupt the queue if it is waiting.
Definition: genrefqueue.h:199
unsigned int qstart
Definition: genrefqueue.h:224
#define Error1(a)
Definition: log.h:274
unsigned int qsize
Definition: genrefqueue.h:224
CS::Threading::Condition datacondition
Definition: genrefqueue.h:226