Changeset 14:8b4b6177d6a2
- Timestamp:
- 04/01/08 15:16:49 (5 months ago)
- Author:
- ryan@…
- Message:
-
Added: cas/gets, and noreply option on sets
- Files:
-
Legend:
- Unmodified
- Added
- Removed
-
|
r10
|
r14
|
|
| 126 | 126 | // create daemon and start it |
| 127 | 127 | MemCacheDaemon daemon = new MemCacheDaemon(); |
| 128 | | LRUCacheStorageDelegate cacheStorage = new LRUCacheStorageDelegate(max_size, max_size, 1024000); |
| | 128 | LRUCacheStorageDelegate cacheStorage = new LRUCacheStorageDelegate(max_size, maxBytes, 1024000); |
| 129 | 129 | daemon.setCache(new Cache(cacheStorage)); |
| 130 | 130 | daemon.setAddr(addr); |
-
|
r10
|
r14
|
|
| 32 | 32 | public int getMisses; |
| 33 | 33 | |
| | 34 | public long casCounter; |
| | 35 | |
| 34 | 36 | protected CacheStorage cacheStorage; |
| | 37 | |
| | 38 | |
| | 39 | public enum StoreResponse { |
| | 40 | STORED, NOT_STORED, EXISTS, NOT_FOUND |
| | 41 | } |
| 35 | 42 | |
| 36 | 43 | |
| … |
… |
|
| 40 | 47 | */ |
| 41 | 48 | private final ReadWriteLock cacheReadWriteLock; |
| 42 | | |
| 43 | 49 | |
| 44 | 50 | /** |
| … |
… |
|
| 90 | 96 | * @return the message response string |
| 91 | 97 | */ |
| 92 | | protected boolean add(MCElement e) { |
| 93 | | try { |
| 94 | | startCacheRead(); |
| 95 | | return !is_there(e.keystring) && set(e); |
| | 98 | protected StoreResponse add(MCElement e) { |
| | 99 | try { |
| | 100 | startCacheRead(); |
| | 101 | if (is_there(e.keystring)) return set(e); |
| | 102 | else return StoreResponse.NOT_STORED; |
| 96 | 103 | } finally { |
| 97 | 104 | finishCacheRead(); |
| … |
… |
|
| 105 | 112 | * @return the message response string |
| 106 | 113 | */ |
| 107 | | public boolean replace(MCElement e) { |
| 108 | | try { |
| 109 | | startCacheRead(); |
| 110 | | return is_there(e.keystring) && set(e); |
| 111 | | } finally { |
| 112 | | finishCacheRead(); |
| 113 | | } |
| 114 | | } |
| | 114 | public StoreResponse replace(MCElement e) { |
| | 115 | try { |
| | 116 | startCacheRead(); |
| | 117 | if (is_there(e.keystring)) return set(e); |
| | 118 | else return StoreResponse.NOT_STORED; |
| | 119 | } finally { |
| | 120 | finishCacheRead(); |
| | 121 | } |
| | 122 | } |
| | 123 | |
| 115 | 124 | |
| 116 | 125 | /** |
| … |
… |
|
| 120 | 129 | * @return the message response string |
| 121 | 130 | */ |
| 122 | | protected boolean set(MCElement e) { |
| | 131 | protected StoreResponse set(MCElement e) { |
| 123 | 132 | try { |
| 124 | 133 | startCacheWrite(); |
| 125 | 134 | setCmds += 1;//update stats |
| | 135 | |
| | 136 | // increment the CAS counter; put in the new CAS |
| | 137 | e.cas_unique = casCounter++; |
| | 138 | |
| 126 | 139 | this.cacheStorage.put(e.keystring, e); |
| 127 | | return true; |
| | 140 | return StoreResponse.STORED; |
| | 141 | } finally { |
| | 142 | finishCacheWrite(); |
| | 143 | } |
| | 144 | } |
| | 145 | |
| | 146 | public StoreResponse cas(Long cas_key, MCElement e) { |
| | 147 | try { |
| | 148 | startCacheWrite(); |
| | 149 | // have to get the element |
| | 150 | MCElement element = get(e.keystring); |
| | 151 | if (element == null) |
| | 152 | return StoreResponse.NOT_FOUND; |
| | 153 | |
| | 154 | if (element.cas_unique == cas_key) { |
| | 155 | // cas_unique matches, now set the element |
| | 156 | return set(e); |
| | 157 | } else { |
| | 158 | // cas didn't match; someone else beat us to it |
| | 159 | return StoreResponse.EXISTS; |
| | 160 | } |
| | 161 | |
| 128 | 162 | } finally { |
| 129 | 163 | finishCacheWrite(); |
| … |
… |
|
| 157 | 191 | e.data = valueOf(old_val).getBytes(); // toString |
| 158 | 192 | e.data_length = e.data.length; |
| | 193 | |
| | 194 | // assign new cas id |
| | 195 | e.cas_unique = casCounter++; |
| | 196 | |
| 159 | 197 | this.cacheStorage.put(e.keystring, e); // save new value |
| 160 | 198 | return old_val; |
| … |
… |
|
| 345 | 383 | return getMisses; |
| 346 | 384 | } |
| | 385 | |
| 347 | 386 | } |
-
|
r10
|
r14
|
|
| 178 | 178 | if (cmd.cmd == Commands.ADD || |
| 179 | 179 | cmd.cmd == Commands.SET || |
| 180 | | cmd.cmd == Commands.REPLACE) { |
| | 180 | cmd.cmd == Commands.REPLACE || |
| | 181 | cmd.cmd == Commands.CAS) { |
| 181 | 182 | |
| 182 | 183 | // if we don't have all the parts, it's malformed |
| 183 | | if (parts.size() != 5) { |
| | 184 | if (parts.size() < 5) { |
| 184 | 185 | return new SessionStatus(ERROR); |
| 185 | 186 | } |
| | 187 | |
| 186 | 188 | |
| 187 | 189 | int size = Integer.parseInt(parts.get(4)); |
| … |
… |
|
| 196 | 198 | cmd.element.data_length = size; |
| 197 | 199 | |
| | 200 | // look for cas and "noreply" elements |
| | 201 | if (parts.size() > 5) { |
| | 202 | int noreply = cmd.cmd == Commands.CAS ? 6 : 5; |
| | 203 | if (cmd.cmd == Commands.CAS) { |
| | 204 | cmd.cas_key = Long.valueOf(parts.get(5)); |
| | 205 | } |
| | 206 | |
| | 207 | if (parts.size() == noreply + 1 && parts.get(noreply).equalsIgnoreCase("noreply")) |
| | 208 | cmd.noreply = true; |
| | 209 | |
| | 210 | } |
| | 211 | |
| 198 | 212 | return new SessionStatus(WAITING_FOR_DATA, size, cmd); |
| 199 | 213 | |
| 200 | 214 | } else if (cmd.cmd == Commands.GET || |
| | 215 | cmd.cmd == Commands.GETS || |
| 201 | 216 | cmd.cmd == Commands.INCR || |
| 202 | 217 | cmd.cmd == Commands.DECR || |
-
|
r10
|
r14
|
|
| 26 | 26 | public MCElement element; |
| 27 | 27 | public ArrayList<String> keys; |
| | 28 | public boolean noreply; |
| | 29 | public Long cas_key; |
| 28 | 30 | |
| 29 | 31 | public CommandMessage(String cmd) { |
-
|
r10
|
r14
|
|
| 20 | 20 | public interface Commands { |
| 21 | 21 | String GET = "GET".intern(); |
| | 22 | String GETS = "GETS".intern(); |
| 22 | 23 | String DELETE = "DELETE".intern(); |
| 23 | 24 | String DECR = "DECR".intern(); |
| … |
… |
|
| 26 | 27 | String ADD = "ADD".intern(); |
| 27 | 28 | String SET = "SET".intern(); |
| | 29 | String CAS = "CAS".intern(); |
| 28 | 30 | String STATS = "STATS".intern(); |
| 29 | 31 | String VERSION = "VERSION".intern(); |
-
|
r10
|
r14
|
|
| 27 | 27 | public byte[] data; |
| 28 | 28 | public String keystring; |
| | 29 | public long cas_unique; |
| 29 | 30 | } |
-
|
r10
|
r14
|
|
| 22 | 22 | import org.slf4j.LoggerFactory; |
| 23 | 23 | |
| | 24 | import static java.lang.Integer.parseInt; |
| | 25 | import static java.lang.String.valueOf; |
| 24 | 26 | import java.nio.charset.CharacterCodingException; |
| 25 | 27 | import java.nio.charset.Charset; |
| 26 | 28 | import java.nio.charset.CharsetEncoder; |
| 27 | 29 | import java.util.Iterator; |
| 28 | | import static java.lang.String.valueOf; |
| 29 | | import static java.lang.Integer.*; |
| 30 | 30 | |
| 31 | 31 | /** |
| … |
… |
|
| 50 | 50 | public boolean verbose; |
| 51 | 51 | |
| 52 | | public static CharsetEncoder ENCODER = Charset.forName("US-ASCII").newEncoder(); |
| | 52 | public static CharsetEncoder ENCODER = Charset.forName("US-ASCII").newEncoder(); |
| 53 | 53 | |
| 54 | 54 | /** |
| … |
… |
|
| 59 | 59 | * Construct the server session handler |
| 60 | 60 | * |
| 61 | | * @param cache the cache to use |
| | 61 | * @param cache the cache to use |
| 62 | 62 | * @param memcachedVersion the version string to return to clients |
| 63 | | * @param verbosity verbosity level for debugging |
| 64 | | * @param idle how long sessions can be idle for |
| | 63 | * @param verbosity verbosity level for debugging |
| | 64 | * @param idle how long sessions can be idle for |
| 65 | 65 | */ |
| 66 | 66 | public ServerSessionHandler(Cache cache, String memcachedVersion, boolean verbosity, int idle) { |
| … |
… |
|
| 68 | 68 | |
| 69 | 69 | this.cache = cache; |
| 70 | | |
| | 70 | |
| 71 | 71 | started = Now(); |
| 72 | 72 | version = memcachedVersion; |
| … |
… |
|
| 141 | 141 | |
| 142 | 142 | ResponseMessage r = new ResponseMessage(); |
| 143 | | if (cmd == Commands.GET) { |
| | 143 | if (cmd == Commands.GET || cmd == Commands.GETS) { |
| 144 | 144 | for (int i = 0; i < cmdKeysSize; i++) { |
| 145 | 145 | MCElement result = get(command.keys.get(i)); |
| 146 | 146 | if (result != null) { |
| 147 | | r.out.putString("VALUE " + result.keystring + " " + result.flags + " " + result.data_length + "\r\n", ENCODER); |
| | 147 | r.out.putString("VALUE " + result.keystring + " " + result.flags + " " + result.data_length + (cmd == Commands.GETS ? " " + result.cas_unique : "") + "\r\n", ENCODER); |
| 148 | 148 | r.out.put(result.data, 0, result.data_length); |
| 149 | 149 | r.out.putString("\r\n", ENCODER); |
| … |
… |
|
| 153 | 153 | r.out.putString("END\r\n", ENCODER); |
| 154 | 154 | } else if (cmd == Commands.SET) { |
| 155 | | r.out.putString(set(command.element), ENCODER); |
| | 155 | String ret = set(command.element); |
| | 156 | if (!command.noreply) |
| | 157 | r.out.putString(ret, ENCODER); |
| | 158 | } else if (cmd == Commands.CAS) { |
| | 159 | String ret = cas(command.cas_key, command.element); |
| | 160 | if (!command.noreply) |
| | 161 | r.out.putString(ret, ENCODER); |
| 156 | 162 | } else if (cmd == Commands.ADD) { |
| 157 | | r.out.putString(add(command.element), ENCODER); |
| | 163 | String ret = add(command.element); |
| | 164 | if (!command.noreply) |
| | 165 | r.out.putString(ret, ENCODER); |
| 158 | 166 | } else if (cmd == Commands.REPLACE) { |
| 159 | | r.out.putString(replace(command.element), ENCODER); |
| | 167 | String ret = replace(command.element); |
| | 168 | if (!command.noreply) |
| | 169 | r.out.putString(ret, ENCODER); |
| 160 | 170 | } else if (cmd == Commands.INCR) { |
| 161 | 171 | r.out.putString(get_add(command.keys.get(0), parseInt(command.keys.get(1))), ENCODER); |
| … |
… |
|
| 207 | 217 | /** |
| 208 | 218 | * Triggered when a session has gone idle. |
| | 219 | * |
| 209 | 220 | * @param session the MINA session |
| 210 | | * @param status the idle status |
| | 221 | * @param status the idle status |
| 211 | 222 | */ |
| 212 | 223 | public void sessionIdle(IoSession session, IdleStatus status) { |
| … |
… |
|
| 217 | 228 | /** |
| 218 | 229 | * Triggered when an exception is caught by the protocol handler |
| | 230 | * |
| 219 | 231 | * @param session the MINA session |
| 220 | | * @param cause the exception |
| | 232 | * @param cause the exception |
| 221 | 233 | */ |
| 222 | 234 | public void exceptionCaught(IoSession session, Throwable cause) { |
| … |
… |
|
| 230 | 242 | * Handle the deletion of an item from the cache. |
| 231 | 243 | * |
| 232 | | * @param key the key for the item |
| | 244 | * @param key the key for the item |
| 233 | 245 | * @param time only delete the element if time (time in seconds) |
| 234 | 246 | * @return the message response |
| … |
… |
|
| 246 | 258 | */ |
| 247 | 259 | protected String add(MCElement e) { |
| 248 | | if (cache.add(e)) return "STORED\r\n"; |
| | 260 | if (cache.add(e) == Cache.StoreResponse.STORED) return "STORED\r\n"; |
| 249 | 261 | else return "NOT_STORED\r\n"; |
| | 262 | } |
| | 263 | |
| | 264 | protected String getStoreResponeString(Cache.StoreResponse storeResponse) { |
| | 265 | switch (storeResponse) { |
| | 266 | case EXISTS: |
| | 267 | return "EXISTS\r\n"; |
| | 268 | case NOT_FOUND: |
| | 269 | return "NOT_FOUND\r\n"; |
| | 270 | case NOT_STORED: |
| | 271 | return "NOT_STORED\r\n"; |
| | 272 | case STORED: |
| | 273 | return "STORED\r\n"; |
| | 274 | } |
| | 275 | return null; |
| 250 | 276 | } |
| 251 | 277 | |
| … |
… |
|
| 257 | 283 | */ |
| 258 | 284 | protected String replace(MCElement e) { |
| 259 | | if (cache.replace(e)) return "STORED\r\n"; |
| 260 | | else return "NOT_STORED\r\n"; |
| | 285 | return getStoreResponeString(cache.replace(e)); |
| 261 | 286 | } |
| 262 | 287 | |
| … |
… |
|
| 268 | 293 | */ |
| 269 | 294 | protected String set(MCElement e) { |
| 270 | | if (cache.set(e)) return "STORED\r\n"; |
| 271 | | else return "NOT_STORED\r\n"; |
| 272 | | } |
| 273 | | |
| 274 | | /** |
| 275 | | * Increment an (integer) element inthe cache |
| | 295 | return getStoreResponeString(cache.set(e)); |
| | 296 | } |
| | 297 | |
| | 298 | /** |
| | 299 | * Check and set an element in the cache |
| | 300 | * |
| | 301 | * @param cas_key |
| | 302 | * @param e the element to set @return the message response string |
| | 303 | */ |
| | 304 | protected String cas(Long cas_key, MCElement e) { |
| | 305 | return getStoreResponeString(cache.cas(cas_key, e)); |
| | 306 | } |
| | 307 | |
| | 308 | /** |
| | 309 | * Increment an (integer) element in the cache |
| | 310 | * |
| 276 | 311 | * @param key the key to increment |
| 277 | 312 | * @param mod the amount to add to the value |
| … |
… |
|
| 289 | 324 | /** |
| 290 | 325 | * Check whether an element is in the cache and non-expired |
| | 326 | * |
| 291 | 327 | * @param key the key for the element to lookup |
| 292 | 328 | * @return whether the element is in the cache and is live |
| … |
… |
|
| 298 | 334 | /** |
| 299 | 335 | * Get an element from the cache |
| | 336 | * |
| 300 | 337 | * @param key the key for the element to lookup |
| 301 | 338 | * @return the element, or 'null' in case of cache miss. |
| … |
… |
|
| 326 | 363 | /** |
| 327 | 364 | * Return runtime statistics |
| | 365 | * |
| 328 | 366 | * @param arg additional arguments to the stats command |
| 329 | 367 | * @return the full command response |
| … |
… |
|
| 372 | 410 | /** |
| 373 | 411 | * Flush all cache entries |
| | 412 | * |
| 374 | 413 | * @return command response |
| 375 | 414 | */ |
| … |
… |
|
| 380 | 419 | /** |
| 381 | 420 | * Flush all cache entries with a timestamp after a given expiration time |
| | 421 | * |
| 382 | 422 | * @param expire the flush time in seconds |
| 383 | 423 | * @return command response |
| … |
… |
|
| 388 | 428 | |
| 389 | 429 | |
| 390 | | |
| 391 | | |
| 392 | | |
| 393 | 430 | } |