import io.proton.common.util.JsonCodec; import io.proton.shaded.org.apache.commons.io.IOUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.nio.file.attribute.PosixFileAttributes; import java.nio.file.attribute.PosixFilePermission; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function;
public class DiskConcurrentHashMap<K,V> implements ConcurrentMap<K,V> { private static final Logger LOG = LoggerFactory.getLogger(DiskConcurrentHashMap.class); private static final JsonCodec<Map> CODEC = new JsonCodec<>(Map.class); private static final ReentrantReadWriteLock threadLock = new ReentrantReadWriteLock(); private static final int MAXIMUM_CAPACITY = 1 << 30;
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl; private final transient String storageFile;
/** * The default initial table capacity. Must be a power of 2 * (i.e., at least 1) and at most MAXIMUM_CAPACITY. */ private static final int DEFAULT_CAPACITY = 16;
/** * The data will be serialized and stored in this directory, * * and the permissions for this directory are readable and writable by all users. */ private static final String DEFAULT_LOCAL_PAT = "/tmp/";
/** * Creates a new, empty map with an initial table size based on the given number of elements. * By default, data will be saved on disk, and consistency will be guaranteed through a global file lock. * When updating the Map, a file write lock will be obtained, which is an exclusive lock that only allows one thread to operate. * When reading from the Map, a file read lock will be obtained, which is a shared lock that allows multiple threads to read simultaneously. * @param initialCapacity the initial capacity. The implementation performs internal sizing to accommodate this many elements, given the specified load factor. * @param storageFile the local cache file, The default directory for this file is /tmp. * When setting it to another directory path, ensure that the file has common read and write permissions if it needs to be shared between multiple JVM processes. * For example, set the file to 777 permissions. */ public DiskConcurrentHashMap(int initialCapacity, String storageFile) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; this.storageFile = isAbsolutePath(storageFile) ? storageFile : String.format("%s/%s", DEFAULT_LOCAL_PAT, storageFile); }
public DiskConcurrentHashMap(String storageFile) { this(DEFAULT_CAPACITY, storageFile); }
@Override public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { if (key == null || mappingFunction == null) { throw new NullPointerException(); }
V existV = get(key); if (existV != null) { return existV; }
try (FileOutputStream out = new FileOutputStream(storageFile); FileChannel channel = FileChannel.open( Paths.get(storageFile), StandardOpenOption.READ, StandardOpenOption.WRITE)) { FileLock lock = channel.tryLock(); threadLock.writeLock().lock(); while (lock == null) { lock = channel.tryLock(); } Map<K, V> existData = loadFromDiskWithLock(); existV = existData.get(key); if (existV != null) { return existV; }
if (existData.size() > sizeCtl) { throw new IllegalArgumentException( String.format( "The current capacity has reached the maximum capacity. Please delete some data before trying to add more. " + "current capacity: %s, maximum capacity: %s ", sizeCtl, existData.size() ) ); }
V val = mappingFunction.apply(key); existData.put(key, val); IOUtils.copy(new ByteArrayInputStream(CODEC.toBytes(existData)), out); Set<PosixFilePermission> permissions = Files.readAttributes(Paths.get(storageFile), PosixFileAttributes.class).permissions(); permissions.addAll(readWritePermissions()); Files.setPosixFilePermissions(Paths.get(storageFile), permissions); LOG.info("Put a new key into the map successfully, release the write lock."); lock.release(); return val; } catch (IOException e) { throw new UncheckedIOException(e); } finally { threadLock.writeLock().unlock(); } }
@Override public int size() { return loadFromDiskWithLock().size(); }
@Override public boolean isEmpty() { return loadFromDiskWithLock().isEmpty(); }
@Override public boolean containsKey(Object key) { return loadFromDiskWithLock().containsKey(key); }
@Override public boolean containsValue(Object value) { return loadFromDiskWithLock().containsKey(value); }
@Override public V get(Object key) { return loadFromDiskWithLock().get(key); }
@Nullable @Override public V put(K key, V value) { throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute."); }
@Override public V remove(Object key) { V existV = get(key); if (existV == null) { return null; }
try (FileOutputStream out = new FileOutputStream(storageFile); FileChannel channel = FileChannel.open( Paths.get(storageFile), StandardOpenOption.READ, StandardOpenOption.WRITE)) { FileLock lock = channel.lock(); Map<K, V> existData = loadFromDiskWithLock(); existV = existData.get(key); if (existV == null) { return null; } existV = existData.remove(key); IOUtils.copy(new ByteArrayInputStream(CODEC.toBytes(existData)), out); lock.release(); return existV; } catch (IOException e) { throw new UncheckedIOException(e); } }
@Override public void putAll(@NotNull Map<? extends K, ? extends V> m) { throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute."); }
@Override public void clear() { File file = new File(storageFile); if (file.exists()) { file.delete(); } }
@NotNull @Override public Set<K> keySet() { return loadFromDiskWithLock().keySet(); }
@NotNull @Override public Collection<V> values() { return loadFromDiskWithLock().values(); }
@NotNull @Override public Set<Entry<K, V>> entrySet() { return loadFromDiskWithLock().entrySet(); }
@Override public V putIfAbsent(@NotNull K key, V value) { throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute."); }
@Override public boolean remove(@NotNull Object key, Object value) { throw new UnsupportedOperationException("Not supported"); }
@Override public boolean replace(@NotNull K key, @NotNull V oldValue, @NotNull V newValue) { throw new UnsupportedOperationException("Not supported"); }
@Override public V replace(@NotNull K key, @NotNull V value) { throw new UnsupportedOperationException("Not supported"); }
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
/** * Determine whether the path is an absolute path. On Linux systems, absolute paths begin with "/". * On Windows systems, absolute paths are determined by whether the path contains a ":" character. * In Windows systems, absolute paths require the use of a colon (:). e.g. c://path. * @param path storage path. * @return is absolute path. */ private boolean isAbsolutePath(String path) { return path.startsWith("/") || path.indexOf(":") > 0; }
private Map<K, V> loadFromDiskWithLock() { try (FileInputStream in = new FileInputStream(storageFile); ByteArrayOutputStream out = new ByteArrayOutputStream()) { FileLock lock = null; threadLock.readLock().lock(); while (lock == null) { lock = in.getChannel().tryLock(0, new File(storageFile).length(), true); } IOUtils.copy(in, out); lock.release(); return CODEC.fromBytes(out.toByteArray()); } catch (IOException e) { // throw new UncheckedIOException("Failed to serialize data from local file.", e); LOG.error("Failed to serialize data from local file, returning default value.", e); return new HashMap<>(); } finally { threadLock.readLock().unlock(); } }
private Map<K, V> loadFromDisk() { try (FileInputStream in = new FileInputStream(storageFile); ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(in, out); return CODEC.fromBytes(out.toByteArray()); } catch (IOException e) { LOG.error("Failed to serialize data from local file, returning default value.", e); return new HashMap<>(); } }
private Set<PosixFilePermission> readWritePermissions() { Set<PosixFilePermission> perms = new HashSet<>(); perms.add(PosixFilePermission.OWNER_WRITE); perms.add(PosixFilePermission.OWNER_READ); perms.add(PosixFilePermission.GROUP_WRITE); perms.add(PosixFilePermission.GROUP_READ); perms.add(PosixFilePermission.OTHERS_WRITE); perms.add(PosixFilePermission.OTHERS_READ); return perms; } }
|