Changeset 1:c8c4ca70ccb5

Show
Ignore:
Timestamp:
03/29/08 16:09:11 (8 months ago)
Author:
ryan@…
Branch:
default
Message:

Some restructuring, cleanup, and documentation

Location:
src/main/java/com/jehiah/memcached
Files:
3 added
2 removed
6 modified

Legend:

Unmodified
Added
Removed
  • src/main/java/com/jehiah/memcached/CommandDecoder.java

    r0 r1  
    1515 
    1616/** 
     17 * MINA MessageDecoderAdapter responsible for parsing inbound lines from the memcached protocol session. 
    1718 */ 
    1819public final class CommandDecoder extends MessageDecoderAdapter { 
     
    2324 
    2425    private static final int WORD_BUFFER_INIT_SIZE = 16; 
     26 
    2527    private static final String SESSION_STATUS = "sessionStatus"; 
    2628 
     29    /** 
     30     * Possible states that the current session is in. 
     31     */ 
    2732    enum SessionState { 
    2833        ERROR, 
    29         SET_WAIT_LINE, 
     34        WAITING_FOR_DATA, 
    3035        READY 
    3136    } 
    3237 
     38    /** 
     39     * Object for holding the current session status. 
     40     */ 
    3341    final class SessionStatus implements Serializable { 
     42        // the state the session is in 
    3443        public SessionState state; 
     44 
     45        // if we are waiting for more data, how much? 
    3546        public int bytesNeeded; 
     47 
     48        // the current working command 
    3649        public CommandMessage cmd; 
    3750 
     
    4760    } 
    4861 
     62    /** 
     63     * Checks the specified buffer is decodable by this decoder. 
     64     * 
     65     * In our case checks the session state to see if we are waiting for data.  If we are, make sure 
     66     * that we actually have all the data we need. 
     67     * 
     68     * @return {@link #OK} if this decoder can decode the specified buffer. 
     69     *         {@link #NOT_OK} if this decoder cannot decode the specified buffer. 
     70     *         {@link #NEED_DATA} if more data is required to determine if the 
     71     *         specified buffer is decodable ({@link #OK}) or not decodable 
     72     *         {@link #NOT_OK}. 
     73     */ 
    4974    public final MessageDecoderResult decodable(IoSession session, ByteBuffer in) { 
    5075        // ask the session for its state, 
    5176        SessionStatus sessionStatus = (SessionStatus) session.getAttribute(SESSION_STATUS); 
    52         if (sessionStatus != null &&  sessionStatus.state == SET_WAIT_LINE) { 
     77        if (sessionStatus != null &&  sessionStatus.state == WAITING_FOR_DATA) { 
    5378            if (in.remaining() <= sessionStatus.bytesNeeded) 
    5479                return MessageDecoderResult.NEED_DATA; 
     
    5782    } 
    5883 
     84    /** 
     85     * Actually decodes inbound data from the memcached protocol session. 
     86     * 
     87     * MINA invokes {@link #decode(IoSession, ByteBuffer, ProtocolDecoderOutput)} 
     88     * method with read data, and then the decoder implementation puts decoded 
     89     * messages into {@link ProtocolDecoderOutput}. 
     90     * 
     91     * @return {@link #OK} if finished decoding messages successfully. 
     92     *         {@link #NEED_DATA} if you need more data to finish decoding current message. 
     93     *         {@link #NOT_OK} if you cannot decode current message due to protocol specification violation. 
     94     * 
     95     * @throws Exception if the read data violated protocol specification 
     96     */ 
    5997    public final MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { 
    6098        SessionStatus sessionStatus = (SessionStatus) session.getAttribute(SESSION_STATUS); 
    6199        SessionStatus returnedSessionStatus; 
    62         if (sessionStatus != null && sessionStatus.state == SET_WAIT_LINE) { 
     100        if (sessionStatus != null && sessionStatus.state == WAITING_FOR_DATA) { 
    63101            if (in.remaining() < sessionStatus.bytesNeeded) 
    64102                return MessageDecoderResult.NEED_DATA; 
     
    108146 
    109147 
     148    /** 
     149     * Process an individual completel protocol line and either passes the command for processing by the 
     150     * session handler, or (in the case of SET-type commands) partially parses the command and sets the session into 
     151     * a state to wait for additional data. 
     152     * @param parts the (originally space separated) parts of the command 
     153     * @param session the MINA IoSession 
     154     * @param out the MINA protocol decoder output to pass our command on to 
     155     * @return the session status we want to set the session to 
     156     */ 
    110157    private SessionStatus processLine(List<String> parts, IoSession session, ProtocolDecoderOutput out) { 
    111158        if (parts.isEmpty()) 
     
    134181            cmd.element.data_length = size; 
    135182 
    136             return new SessionStatus(SET_WAIT_LINE, size, cmd); 
     183            return new SessionStatus(WAITING_FOR_DATA, size, cmd); 
    137184 
    138185        } else if (cmd.cmd == Commands.GET || 
     
    155202    } 
    156203 
     204    /** 
     205     * Handles the continuation of a SET/ADD/REPLACE command with the data it was waiting for. 
     206     * 
     207     * @param session the MINA IoSession 
     208     * @param out the MINA protocol decoder output which we signal with the completed command 
     209     * @param state the current session status (unused) 
     210     * @param remainder the bytes picked up 
     211     * @return the new status to set the session to 
     212     */ 
    157213    private SessionStatus continueSet(IoSession session, ProtocolDecoderOutput out, SessionStatus state, byte[] remainder) { 
    158214        state.cmd.element.data = remainder; 
     
    163219    } 
    164220 
     221    /** 
     222     * @return the current time in seconds 
     223     */ 
    165224    public final int Now() { 
    166225        return (int) (System.currentTimeMillis() / 1000); 
  • src/main/java/com/jehiah/memcached/CommandMessage.java

    r0 r1  
    1313import java.util.ArrayList; 
    1414 
    15 public class CommandMessage implements Serializable { 
    16     // should be byte buffers? 
     15/** 
     16 * The payload object holding the parsed message. 
     17 */ 
     18public final class CommandMessage implements Serializable { 
    1719    public String cmd; 
    1820    public MCElement element; 
  • src/main/java/com/jehiah/memcached/LRUCache.java

    r0 r1  
    3131 * <li> cache items are expelled when the total maximum size of the cache is 
    3232 * reached 
     33 * <li> a ceiling parameter is available to provide a certain amount of reserved room 
     34 * in the cache 
    3335 * <li> unusual: not built to handle the case of user input affecting the 
    3436 * data; only handles case of changes to the datastore eventually refreshing 
    3537 * the cache. Thus, this cache is not suitable for the case in which user 
    36  * input must be reflected. 
     38 * input must be reflected.  That is: all changes to data must happen through "puts" 
    3739 * </ul> 
    3840 * <p/> 
    3941 * <p/> 
    40  * Source: http://www.javapractices.com/Topic118.cjp 
     42 * Based on (heavily altered) code from original source: http://www.javapractices.com/Topic118.cjp 
    4143 */ 
    4244public final class LRUCache<ID_TYPE, ITEM_TYPE> { 
    4345 
    44     private final Map<ID_TYPE, CacheEntry<ITEM_TYPE>> fItems; 
     46    /** 
     47     * Map containing the actual storage 
     48     */ 
     49    private final Map<ID_TYPE, CacheEntry<ITEM_TYPE>> items; 
    4550 
    4651    /** 
     
    7883        this.maximumSize = maximumSize; 
    7984        this.ceilingSize = ceilingSize; 
    80         fItems = new LinkedHashMap<ID_TYPE, CacheEntry<ITEM_TYPE>>(INITIAL_TABLE_SIZE) { 
     85 
     86        /** 
     87         * Creates a linked hash map which expels old elements on declared criterion 
     88         */ 
     89        items = new LinkedHashMap<ID_TYPE, CacheEntry<ITEM_TYPE>>(INITIAL_TABLE_SIZE) { 
    8190            protected boolean removeEldestEntry(Map.Entry<ID_TYPE, CacheEntry<ITEM_TYPE>> eldest) { 
    8291                if (size + ceilingSize > maximumSize || size() > maximumItems) { 
     
    104113        startRead(); 
    105114        try { 
    106             return fItems.containsKey(aId); 
     115            return items.containsKey(aId); 
    107116        } 
    108117        finally { 
     
    125134        ITEM_TYPE result; 
    126135        try { 
    127             if (fItems.containsKey(aId)) { 
    128                 result = fItems.get(aId).item; 
     136            if (items.containsKey(aId)) { 
     137                result = items.get(aId).item; 
    129138                if (result == null) { 
    130139                    throw new IllegalStateException("Stored item should not be null. Id:" + aId); 
     
    156165        startWrite(); 
    157166        try { 
    158             fItems.put(aId, new CacheEntry<ITEM_TYPE>(item_size, aItem)); 
     167            items.put(aId, new CacheEntry<ITEM_TYPE>(item_size, aItem)); 
    159168            size += item_size; 
    160169        } 
     
    165174 
    166175    /** 
    167      * Start from beginning, and remove all items from the cache; if cache is 
    168      * disabled, do nothing. 
    169      * <p/> 
    170      * Forces a re-population of all items into the cache. 
    171      */ 
    172     public void clear() { 
     176     * Remove an entry from the cache 
     177     * @param key the key for the entry 
     178     */ 
     179    public void remove(ID_TYPE key) { 
    173180        startWrite(); 
    174181        try { 
    175             fItems.clear(); 
    176             size = 0; 
    177         } 
    178         finally { 
    179             finishWrite(); 
    180         } 
    181     } 
    182  
    183     public Set<ID_TYPE> keys() { 
    184         startRead(); 
    185         try { 
    186             return fItems.keySet(); 
    187         } finally { 
    188             finishRead(); 
    189         } 
    190     } 
    191  
    192     public void remove(ID_TYPE keystring) { 
    193         startWrite(); 
    194         try { 
    195             CacheEntry<ITEM_TYPE> item = fItems.get(keystring); 
     182            CacheEntry<ITEM_TYPE> item = items.get(key); 
    196183            if (item != null) { 
    197                 fItems.remove(keystring); 
     184                items.remove(key); 
    198185                size -= item.size; 
    199186            } 
     
    204191 
    205192 
     193    /** 
     194     * Start from beginning, and remove all items from the cache; if cache is 
     195     * disabled, do nothing. 
     196     * <p/> 
     197     * Forces a re-population of all items into the cache. 
     198     */ 
     199    public void clear() { 
     200        startWrite(); 
     201        try { 
     202            items.clear(); 
     203            size = 0; 
     204        } 
     205        finally { 
     206            finishWrite(); 
     207        } 
     208    } 
     209 
     210    /** 
     211     * @return the set of all keys in the cache 
     212     */ 
     213    public Set<ID_TYPE> keys() { 
     214        startRead(); 
     215        try { 
     216            return items.keySet(); 
     217        } finally { 
     218            finishRead(); 
     219        } 
     220    } 
     221 
     222    /** 
     223     * @return the number of entries in the cache 
     224     */ 
     225    public long count() { 
     226        startRead(); 
     227        try { 
     228            return items.size(); 
     229        } finally { 
     230            finishRead(); 
     231        } 
     232    } 
     233 
     234    /** 
     235     * @return the maximum number of items in the cache 
     236     */ 
     237    public int getMaximumItems() { 
     238        return maximumItems; 
     239    } 
     240 
     241    /** 
     242     * @return the size (in bytes) of all entries in the cache. 
     243     */ 
     244    public long getSize() { 
     245        return size; 
     246    } 
     247 
     248    /** 
     249     * @return the maximum capacity (in bytes) of the cache 
     250     */ 
     251    public long getMaximumSize() { 
     252        return maximumSize; 
     253    } 
     254 
     255    /** 
     256     * @return the reserved headroom/ceiling size (in bytes) 
     257     */ 
     258    public long getCeilingSize() { 
     259        return ceilingSize; 
     260    } 
     261 
    206262 
    207263    /** 
     
    218274    } 
    219275 
     276    /** 
     277     * Blocks of code in which the contents of fItems and fTimePlacedIntoCache 
     278     * are examined in any way must be surrounded by calls to <code>startRead</code> 
     279     * and <code>finishRead</code>. See documentation for ReadWriteLock. 
     280     * <p/> 
     281     * Translates the InterruptedException into a generic DataAccessException, to 
     282     * protect the higher layers from implementation details. 
     283     */ 
    220284    private void finishRead() { 
    221285        readWriteLock.readLock().unlock(); 
     
    236300    } 
    237301 
     302    /** 
     303     * Blocks of code in which the contents of fItems and fTimePlacedIntoCache 
     304     * are changed in any way must be surrounded by calls to <code>startWrite</code> and 
     305     * <code>finishWrite</code>. See documentation for ReadWriteLock. 
     306     * <p/> 
     307     * Translates the InterruptedException into a generic DataAccessException, to 
     308     * protect the higher layers from implementation details. 
     309     */ 
    238310    private void finishWrite() { 
    239311        readWriteLock.writeLock().unlock(); 
    240312    } 
    241313 
    242     // 
    243     public long count() { 
    244         startRead(); 
    245         try { 
    246             return fItems.size(); 
    247         } finally { 
    248             finishRead(); 
    249         } 
    250     } 
    251  
    252     public long getSize() { 
    253         return size; 
    254     } 
    255  
    256     public long getMaximumSize() { 
    257         return maximumSize; 
    258     } 
    259  
    260     public long getCeilingSize() { 
    261         return ceilingSize; 
    262     } 
    263  
    264     public int getMaximumItems() { 
    265         return maximumItems; 
    266     } 
    267314 
    268315} 
  • src/main/java/com/jehiah/memcached/MCElement.java

    r0 r1  
    1818    public byte[] data; 
    1919    public String keystring; 
    20  
    21     /* 
    22       * Each item sent by the server looks like this: 
    23       VALUE <key> <flags> <bytes>\r\n 
    24       <data block>\r\n 
    25       */ 
    26     public String toString() { 
    27         return new StringBuffer().append("VALUE ").append(this.keystring).append(" ").append(this.flags).append(" ").append(this.data_length).append("\r\n").append(this.data).append("\r\n").toString(); 
    28 //              return "VALUE " + this.keystring + " " + this.flags + " " +String.valueOf(this.data_length)+ "\r\n"+ this.data + "\r\n"; 
    29     } 
    30  
    3120} 
  • src/main/java/com/jehiah/memcached/Main.java

    r0 r1  
    3939 
    4040        if (cmdline.hasOption("help") || cmdline.hasOption("h")) { 
    41             System.out.println("Memcached Version " + MemCacheD.memcachedVersion); 
     41            System.out.println("Memcached Version " + MemCacheDaemon.memcachedVersion); 
    4242            System.out.println("http://jehiah.com/projects/memcached\n"); 
    4343 
     
    4848 
    4949        if (cmdline.hasOption("V")) { 
    50             System.out.println("Memcached Version " + MemCacheD.memcachedVersion); 
     50            System.out.println("Memcached Version " + MemCacheDaemon.memcachedVersion); 
    5151            return; 
    5252        } 
     
    118118 
    119119        // create daemon and start it 
    120         new MemCacheD(addr, port, max_size, maxBytes, idle, verbose).start(); 
     120        MemCacheDaemon daemon = new MemCacheDaemon(); 
     121        LRUCacheDelegate cacheDelegate = new LRUCacheDelegate(max_size, max_size, 1024000); 
     122        daemon.setCacheDelegate(cacheDelegate); 
     123        daemon.start(); 
    121124    } 
    122125 
  • src/main/java/com/jehiah/memcached/ServerSessionHandler.java

    r0 r1  
    3838    public int idle_limit; 
    3939    public boolean verbose; 
    40     protected MCCache data; 
     40    protected Cache cache; 
    4141 
    4242    public static CharsetEncoder ENCODER  = Charset.forName("US-ASCII").newEncoder(); 
    4343 
    44     public ServerSessionHandler(MCCache cache, String memcachedVersion, boolean verbosity, int idle) { 
     44    public ServerSessionHandler(Cache cache, String memcachedVersion, boolean verbosity, int idle) { 
    4545        initStats(); 
    46         this.data = cache; 
     46        this.cache = cache; 
    4747 
    4848        this.started = Now(); 
     
    175175        if (is_there(keystring)) { 
    176176            if (time != 0) { 
    177                 MCElement el = this.data.get(keystring); 
     177                MCElement el = this.cache.get(keystring); 
    178178                if (el.expire == 0 || el.expire > (Now() + time)) { 
    179179                    el.expire = Now() + time; // update the expire time 
    180                     this.data.put(keystring, el); 
     180                    this.cache.put(keystring, el); 
    181181                }// else it expire before the time we were asked to expire it 
    182182            } else { 
    183                 this.data.remove(keystring); // just remove it 
     183                this.cache.remove(keystring); // just remove it 
    184184            } 
    185185            return "DELETED\r\n"; 
     
    209209    public String set(MCElement e) { 
    210210        set_cmds += 1;//update stats 
    211         this.data.put(e.keystring, e); 
     211        this.cache.put(e.keystring, e); 
    212212        return "STORED\r\n"; 
    213213    } 
    214214 
    215215    public String get_add(String keystring, int mod) { 
    216         MCElement e = this.data.get(keystring); 
     216        // TODO make this threadsafe by cooperating more directly with the cache 
     217        MCElement e = this.cache.get(keystring); 
    217218        if (e == null) { 
    218219            get_misses += 1;//update stats 
     
    230231        e.data = valueOf(old_val).getBytes(); // toString 
    231232        e.data_length = e.data.length; 
    232         this.data.put(e.keystring, e); // save new value 
     233        this.cache.put(e.keystring, e); // save new value 
    233234        return valueOf(old_val) + "\r\n"; // return new value 
    234235    } 
    235236 
    236237    /* 
    237       * this.data.containsKey() would work except it doesn't check the expire time 
     238      * this.cache.containsKey() would work except it doesn't check the expire time 
    238239      */ 
    239240    public boolean is_there(String keystring) { 
    240         MCElement e = this.data.get(keystring); 
     241        MCElement e = this.cache.get(keystring); 
    241242        return e != null && !(e.expire != 0 && e.expire < Now()); 
    242243    } 
     
    244245    public MCElement get(String keystring) { 
    245246        get_cmds += 1;//updates stats 
    246         MCElement e = this.data.get(keystring); 
     247        MCElement e = this.cache.get(keystring); 
    247248        if (e == null) { 
    248249            get_misses += 1;//update stats 
     
    251252        if (e.expire != 0 && e.expire < Now()) { 
    252253            get_misses += 1;//update stats 
     254 
     255            // TODO shouldn't this actually remove the item from cache since it's expired? 
    253256            return null; 
    254257        } 
     
    273276 
    274277        if (arg.equals("keys")) { 
    275             Iterator itr = this.data.keys(); 
     278            Iterator itr = this.cache.keys(); 
    276279            while (itr.hasNext()) { 
    277280                builder.append("STAT key ").append(itr.next()).append("\r\n"); 
     
    291294        builder.append("STAT time ").append(valueOf(Now())).append("\r\n"); 
    292295        builder.append("STAT uptime ").append(valueOf(Now() - this.started)).append("\r\n"); 
    293         builder.append("STAT cur_items ").append(valueOf(this.data.count())).append("\r\n"); 
    294         builder.append("STAT limit_maxbytes ").append(valueOf(this.data.maxSize())).append("\r\n"); 
    295         builder.append("STAT current_bytes ").append(valueOf(this.data.size())).append("\r\n"); 
     296        builder.append("STAT cur_items ").append(valueOf(this.cache.count())).append("\r\n"); 
     297        builder.append("STAT limit_maxbytes ").append(valueOf(this.cache.maxSize())).append("\r\n"); 
     298        builder.append("STAT current_bytes ").append(valueOf(this.cache.size())).append("\r\n"); 
    296299        builder.append("STAT free_bytes ").append(valueOf(Runtime.getRuntime().freeMemory())).append("\r\n"); 
    297300 
     
    313316 
    314317    public String flush_all(int expire) { 
    315         this.data.flushAll(); 
     318        this.cache.flushAll(); 
    316319 
    317320        return "OK\r\n";