Changeset 1:c8c4ca70ccb5
- Timestamp:
- 03/29/08 16:09:11 (8 months ago)
- Branch:
- default
- Location:
- src/main/java/com/jehiah/memcached
- Files:
-
- 3 added
- 2 removed
- 6 modified
-
Cache.java (added)
-
CommandDecoder.java (modified) (8 diffs)
-
CommandMessage.java (modified) (1 diff)
-
LRUCache.java (modified) (9 diffs)
-
LRUCacheDelegate.java (added)
-
MCCache.java (deleted)
-
MCElement.java (modified) (1 diff)
-
Main.java (modified) (3 diffs)
-
MemCacheD.java (deleted)
-
MemCacheDaemon.java (added)
-
ServerSessionHandler.java (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
-
src/main/java/com/jehiah/memcached/CommandDecoder.java
r0 r1 15 15 16 16 /** 17 * MINA MessageDecoderAdapter responsible for parsing inbound lines from the memcached protocol session. 17 18 */ 18 19 public final class CommandDecoder extends MessageDecoderAdapter { … … 23 24 24 25 private static final int WORD_BUFFER_INIT_SIZE = 16; 26 25 27 private static final String SESSION_STATUS = "sessionStatus"; 26 28 29 /** 30 * Possible states that the current session is in. 31 */ 27 32 enum SessionState { 28 33 ERROR, 29 SET_WAIT_LINE,34 WAITING_FOR_DATA, 30 35 READY 31 36 } 32 37 38 /** 39 * Object for holding the current session status. 40 */ 33 41 final class SessionStatus implements Serializable { 42 // the state the session is in 34 43 public SessionState state; 44 45 // if we are waiting for more data, how much? 35 46 public int bytesNeeded; 47 48 // the current working command 36 49 public CommandMessage cmd; 37 50 … … 47 60 } 48 61 62 /** 63 * Checks the specified buffer is decodable by this decoder. 64 * 65 * In our case checks the session state to see if we are waiting for data. If we are, make sure 66 * that we actually have all the data we need. 67 * 68 * @return {@link #OK} if this decoder can decode the specified buffer. 69 * {@link #NOT_OK} if this decoder cannot decode the specified buffer. 70 * {@link #NEED_DATA} if more data is required to determine if the 71 * specified buffer is decodable ({@link #OK}) or not decodable 72 * {@link #NOT_OK}. 73 */ 49 74 public final MessageDecoderResult decodable(IoSession session, ByteBuffer in) { 50 75 // ask the session for its state, 51 76 SessionStatus sessionStatus = (SessionStatus) session.getAttribute(SESSION_STATUS); 52 if (sessionStatus != null && sessionStatus.state == SET_WAIT_LINE) {77 if (sessionStatus != null && sessionStatus.state == WAITING_FOR_DATA) { 53 78 if (in.remaining() <= sessionStatus.bytesNeeded) 54 79 return MessageDecoderResult.NEED_DATA; … … 57 82 } 58 83 84 /** 85 * Actually decodes inbound data from the memcached protocol session. 86 * 87 * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)} 88 * method with read data, and then the decoder implementation puts decoded 89 * messages into {@link ProtocolDecoderOutput}. 90 * 91 * @return {@link #OK} if finished decoding messages successfully. 92 * {@link #NEED_DATA} if you need more data to finish decoding current message. 93 * {@link #NOT_OK} if you cannot decode current message due to protocol specification violation. 94 * 95 * @throws Exception if the read data violated protocol specification 96 */ 59 97 public final MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { 60 98 SessionStatus sessionStatus = (SessionStatus) session.getAttribute(SESSION_STATUS); 61 99 SessionStatus returnedSessionStatus; 62 if (sessionStatus != null && sessionStatus.state == SET_WAIT_LINE) {100 if (sessionStatus != null && sessionStatus.state == WAITING_FOR_DATA) { 63 101 if (in.remaining() < sessionStatus.bytesNeeded) 64 102 return MessageDecoderResult.NEED_DATA; … … 108 146 109 147 148 /** 149 * Process an individual completel protocol line and either passes the command for processing by the 150 * session handler, or (in the case of SET-type commands) partially parses the command and sets the session into 151 * a state to wait for additional data. 152 * @param parts the (originally space separated) parts of the command 153 * @param session the MINA IoSession 154 * @param out the MINA protocol decoder output to pass our command on to 155 * @return the session status we want to set the session to 156 */ 110 157 private SessionStatus processLine(List<String> parts, IoSession session, ProtocolDecoderOutput out) { 111 158 if (parts.isEmpty()) … … 134 181 cmd.element.data_length = size; 135 182 136 return new SessionStatus( SET_WAIT_LINE, size, cmd);183 return new SessionStatus(WAITING_FOR_DATA, size, cmd); 137 184 138 185 } else if (cmd.cmd == Commands.GET || … … 155 202 } 156 203 204 /** 205 * Handles the continuation of a SET/ADD/REPLACE command with the data it was waiting for. 206 * 207 * @param session the MINA IoSession 208 * @param out the MINA protocol decoder output which we signal with the completed command 209 * @param state the current session status (unused) 210 * @param remainder the bytes picked up 211 * @return the new status to set the session to 212 */ 157 213 private SessionStatus continueSet(IoSession session, ProtocolDecoderOutput out, SessionStatus state, byte[] remainder) { 158 214 state.cmd.element.data = remainder; … … 163 219 } 164 220 221 /** 222 * @return the current time in seconds 223 */ 165 224 public final int Now() { 166 225 return (int) (System.currentTimeMillis() / 1000); -
src/main/java/com/jehiah/memcached/CommandMessage.java
r0 r1 13 13 import java.util.ArrayList; 14 14 15 public class CommandMessage implements Serializable { 16 // should be byte buffers? 15 /** 16 * The payload object holding the parsed message. 17 */ 18 public final class CommandMessage implements Serializable { 17 19 public String cmd; 18 20 public MCElement element; -
src/main/java/com/jehiah/memcached/LRUCache.java
r0 r1 31 31 * <li> cache items are expelled when the total maximum size of the cache is 32 32 * reached 33 * <li> a ceiling parameter is available to provide a certain amount of reserved room 34 * in the cache 33 35 * <li> unusual: not built to handle the case of user input affecting the 34 36 * data; only handles case of changes to the datastore eventually refreshing 35 37 * the cache. Thus, this cache is not suitable for the case in which user 36 * input must be reflected. 38 * input must be reflected. That is: all changes to data must happen through "puts" 37 39 * </ul> 38 40 * <p/> 39 41 * <p/> 40 * Source: http://www.javapractices.com/Topic118.cjp42 * Based on (heavily altered) code from original source: http://www.javapractices.com/Topic118.cjp 41 43 */ 42 44 public final class LRUCache<ID_TYPE, ITEM_TYPE> { 43 45 44 private final Map<ID_TYPE, CacheEntry<ITEM_TYPE>> fItems; 46 /** 47 * Map containing the actual storage 48 */ 49 private final Map<ID_TYPE, CacheEntry<ITEM_TYPE>> items; 45 50 46 51 /** … … 78 83 this.maximumSize = maximumSize; 79 84 this.ceilingSize = ceilingSize; 80 fItems = new LinkedHashMap<ID_TYPE, CacheEntry<ITEM_TYPE>>(INITIAL_TABLE_SIZE) { 85 86 /** 87 * Creates a linked hash map which expels old elements on declared criterion 88 */ 89 items = new LinkedHashMap<ID_TYPE, CacheEntry<ITEM_TYPE>>(INITIAL_TABLE_SIZE) { 81 90 protected boolean removeEldestEntry(Map.Entry<ID_TYPE, CacheEntry<ITEM_TYPE>> eldest) { 82 91 if (size + ceilingSize > maximumSize || size() > maximumItems) { … … 104 113 startRead(); 105 114 try { 106 return fItems.containsKey(aId);115 return items.containsKey(aId); 107 116 } 108 117 finally { … … 125 134 ITEM_TYPE result; 126 135 try { 127 if ( fItems.containsKey(aId)) {128 result = fItems.get(aId).item;136 if (items.containsKey(aId)) { 137 result = items.get(aId).item; 129 138 if (result == null) { 130 139 throw new IllegalStateException("Stored item should not be null. Id:" + aId); … … 156 165 startWrite(); 157 166 try { 158 fItems.put(aId, new CacheEntry<ITEM_TYPE>(item_size, aItem));167 items.put(aId, new CacheEntry<ITEM_TYPE>(item_size, aItem)); 159 168 size += item_size; 160 169 } … … 165 174 166 175 /** 167 * Start from beginning, and remove all items from the cache; if cache is 168 * disabled, do nothing. 169 * <p/> 170 * Forces a re-population of all items into the cache. 171 */ 172 public void clear() { 176 * Remove an entry from the cache 177 * @param key the key for the entry 178 */ 179 public void remove(ID_TYPE key) { 173 180 startWrite(); 174 181 try { 175 fItems.clear(); 176 size = 0; 177 } 178 finally { 179 finishWrite(); 180 } 181 } 182 183 public Set<ID_TYPE> keys() { 184 startRead(); 185 try { 186 return fItems.keySet(); 187 } finally { 188 finishRead(); 189 } 190 } 191 192 public void remove(ID_TYPE keystring) { 193 startWrite(); 194 try { 195 CacheEntry<ITEM_TYPE> item = fItems.get(keystring); 182 CacheEntry<ITEM_TYPE> item = items.get(key); 196 183 if (item != null) { 197 fItems.remove(keystring);184 items.remove(key); 198 185 size -= item.size; 199 186 } … … 204 191 205 192 193 /** 194 * Start from beginning, and remove all items from the cache; if cache is 195 * disabled, do nothing. 196 * <p/> 197 * Forces a re-population of all items into the cache. 198 */ 199 public void clear() { 200 startWrite(); 201 try { 202 items.clear(); 203 size = 0; 204 } 205 finally { 206 finishWrite(); 207 } 208 } 209 210 /** 211 * @return the set of all keys in the cache 212 */ 213 public Set<ID_TYPE> keys() { 214 startRead(); 215 try { 216 return items.keySet(); 217 } finally { 218 finishRead(); 219 } 220 } 221 222 /** 223 * @return the number of entries in the cache 224 */ 225 public long count() { 226 startRead(); 227 try { 228 return items.size(); 229 } finally { 230 finishRead(); 231 } 232 } 233 234 /** 235 * @return the maximum number of items in the cache 236 */ 237 public int getMaximumItems() { 238 return maximumItems; 239 } 240 241 /** 242 * @return the size (in bytes) of all entries in the cache. 243 */ 244 public long getSize() { 245 return size; 246 } 247 248 /** 249 * @return the maximum capacity (in bytes) of the cache 250 */ 251 public long getMaximumSize() { 252 return maximumSize; 253 } 254 255 /** 256 * @return the reserved headroom/ceiling size (in bytes) 257 */ 258 public long getCeilingSize() { 259 return ceilingSize; 260 } 261 206 262 207 263 /** … … 218 274 } 219 275 276 /** 277 * Blocks of code in which the contents of fItems and fTimePlacedIntoCache 278 * are examined in any way must be surrounded by calls to <code>startRead</code> 279 * and <code>finishRead</code>. See documentation for ReadWriteLock. 280 * <p/> 281 * Translates the InterruptedException into a generic DataAccessException, to 282 * protect the higher layers from implementation details. 283 */ 220 284 private void finishRead() { 221 285 readWriteLock.readLock().unlock(); … … 236 300 } 237 301 302 /** 303 * Blocks of code in which the contents of fItems and fTimePlacedIntoCache 304 * are changed in any way must be surrounded by calls to <code>startWrite</code> and 305 * <code>finishWrite</code>. See documentation for ReadWriteLock. 306 * <p/> 307 * Translates the InterruptedException into a generic DataAccessException, to 308 * protect the higher layers from implementation details. 309 */ 238 310 private void finishWrite() { 239 311 readWriteLock.writeLock().unlock(); 240 312 } 241 313 242 //243 public long count() {244 startRead();245 try {246 return fItems.size();247 } finally {248 finishRead();249 }250 }251 252 public long getSize() {253 return size;254 }255 256 public long getMaximumSize() {257 return maximumSize;258 }259 260 public long getCeilingSize() {261 return ceilingSize;262 }263 264 public int getMaximumItems() {265 return maximumItems;266 }267 314 268 315 } -
src/main/java/com/jehiah/memcached/MCElement.java
r0 r1 18 18 public byte[] data; 19 19 public String keystring; 20 21 /*22 * Each item sent by the server looks like this:23 VALUE <key> <flags> <bytes>\r\n24 <data block>\r\n25 */26 public String toString() {27 return new StringBuffer().append("VALUE ").append(this.keystring).append(" ").append(this.flags).append(" ").append(this.data_length).append("\r\n").append(this.data).append("\r\n").toString();28 // return "VALUE " + this.keystring + " " + this.flags + " " +String.valueOf(this.data_length)+ "\r\n"+ this.data + "\r\n";29 }30 31 20 } -
src/main/java/com/jehiah/memcached/Main.java
r0 r1 39 39 40 40 if (cmdline.hasOption("help") || cmdline.hasOption("h")) { 41 System.out.println("Memcached Version " + MemCacheD .memcachedVersion);41 System.out.println("Memcached Version " + MemCacheDaemon.memcachedVersion); 42 42 System.out.println("http://jehiah.com/projects/memcached\n"); 43 43 … … 48 48 49 49 if (cmdline.hasOption("V")) { 50 System.out.println("Memcached Version " + MemCacheD .memcachedVersion);50 System.out.println("Memcached Version " + MemCacheDaemon.memcachedVersion); 51 51 return; 52 52 } … … 118 118 119 119 // create daemon and start it 120 new MemCacheD(addr, port, max_size, maxBytes, idle, verbose).start(); 120 MemCacheDaemon daemon = new MemCacheDaemon(); 121 LRUCacheDelegate cacheDelegate = new LRUCacheDelegate(max_size, max_size, 1024000); 122 daemon.setCacheDelegate(cacheDelegate); 123 daemon.start(); 121 124 } 122 125 -
src/main/java/com/jehiah/memcached/ServerSessionHandler.java
r0 r1 38 38 public int idle_limit; 39 39 public boolean verbose; 40 protected MCCache data;40 protected Cache cache; 41 41 42 42 public static CharsetEncoder ENCODER = Charset.forName("US-ASCII").newEncoder(); 43 43 44 public ServerSessionHandler( MCCache cache, String memcachedVersion, boolean verbosity, int idle) {44 public ServerSessionHandler(Cache cache, String memcachedVersion, boolean verbosity, int idle) { 45 45 initStats(); 46 this. data= cache;46 this.cache = cache; 47 47 48 48 this.started = Now(); … … 175 175 if (is_there(keystring)) { 176 176 if (time != 0) { 177 MCElement el = this. data.get(keystring);177 MCElement el = this.cache.get(keystring); 178 178 if (el.expire == 0 || el.expire > (Now() + time)) { 179 179 el.expire = Now() + time; // update the expire time 180 this. data.put(keystring, el);180 this.cache.put(keystring, el); 181 181 }// else it expire before the time we were asked to expire it 182 182 } else { 183 this. data.remove(keystring); // just remove it183 this.cache.remove(keystring); // just remove it 184 184 } 185 185 return "DELETED\r\n"; … … 209 209 public String set(MCElement e) { 210 210 set_cmds += 1;//update stats 211 this. data.put(e.keystring, e);211 this.cache.put(e.keystring, e); 212 212 return "STORED\r\n"; 213 213 } 214 214 215 215 public String get_add(String keystring, int mod) { 216 MCElement e = this.data.get(keystring); 216 // TODO make this threadsafe by cooperating more directly with the cache 217 MCElement e = this.cache.get(keystring); 217 218 if (e == null) { 218 219 get_misses += 1;//update stats … … 230 231 e.data = valueOf(old_val).getBytes(); // toString 231 232 e.data_length = e.data.length; 232 this. data.put(e.keystring, e); // save new value233 this.cache.put(e.keystring, e); // save new value 233 234 return valueOf(old_val) + "\r\n"; // return new value 234 235 } 235 236 236 237 /* 237 * this. data.containsKey() would work except it doesn't check the expire time238 * this.cache.containsKey() would work except it doesn't check the expire time 238 239 */ 239 240 public boolean is_there(String keystring) { 240 MCElement e = this. data.get(keystring);241 MCElement e = this.cache.get(keystring); 241 242 return e != null && !(e.expire != 0 && e.expire < Now()); 242 243 } … … 244 245 public MCElement get(String keystring) { 245 246 get_cmds += 1;//updates stats 246 MCElement e = this. data.get(keystring);247 MCElement e = this.cache.get(keystring); 247 248 if (e == null) { 248 249 get_misses += 1;//update stats … … 251 252 if (e.expire != 0 && e.expire < Now()) { 252 253 get_misses += 1;//update stats 254 255 // TODO shouldn't this actually remove the item from cache since it's expired? 253 256 return null; 254 257 } … … 273 276 274 277 if (arg.equals("keys")) { 275 Iterator itr = this. data.keys();278 Iterator itr = this.cache.keys(); 276 279 while (itr.hasNext()) { 277 280 builder.append("STAT key ").append(itr.next()).append("\r\n"); … … 291 294 builder.append("STAT time ").append(valueOf(Now())).append("\r\n"); 292 295 builder.append("STAT uptime ").append(valueOf(Now() - this.started)).append("\r\n"); 293 builder.append("STAT cur_items ").append(valueOf(this. data.count())).append("\r\n");294 builder.append("STAT limit_maxbytes ").append(valueOf(this. data.maxSize())).append("\r\n");295 builder.append("STAT current_bytes ").append(valueOf(this. data.size())).append("\r\n");296 builder.append("STAT cur_items ").append(valueOf(this.cache.count())).append("\r\n"); 297 builder.append("STAT limit_maxbytes ").append(valueOf(this.cache.maxSize())).append("\r\n"); 298 builder.append("STAT current_bytes ").append(valueOf(this.cache.size())).append("\r\n"); 296 299 builder.append("STAT free_bytes ").append(valueOf(Runtime.getRuntime().freeMemory())).append("\r\n"); 297 300 … … 313 316 314 317 public String flush_all(int expire) { 315 this. data.flushAll();318 this.cache.flushAll(); 316 319 317 320 return "OK\r\n";
