在有的情况下,比如做应用之间的某些数据的缓存,当涉及到跨应用的时候,就需要使用到第三方存储介质,例如数据库或者Redis这一类介质。

如果应用之间需要传递和存储的数据非常简单,引入一个第三方存储介质显然是一个成本比较高的方案,通常会选择直接在当前机器上落盘一个缓存文件。

在不同的业务系统之间以本地文件系统的磁盘文件作为存储的介质。

如果业务简单,且错峰使用,那么并不需要额外的处理直接去操作文件即可,但是如何业务频率高,就不得不考虑并发的问题。

比如 如果多个线程同时写文件怎么办,如果一个线程正在写,另一个线程在读怎么办,这还是简单的多线程的情况。

扩展到多应用下,如果是Java,就变成了跨进程的情况了,如果多个进程同时写文件怎么办,如果一个进程正在写,另一个进程在读怎么办。

跨进程的处理比跨线程麻烦太多,进程至少在同一个JVM内,看起虽然使用磁盘作为存储降低了引入第三方存储介质的问题,但是貌似又引入了多进程并发的问题。

不得不引入类似ZK这样的服务?

Java本身提供了不同的能力,支持去实现这种情况下的需求,本质的核心还是加锁,也就是FileLock。

FileLock是文件锁,进程锁,用于进程间并发,控制不同程序(JVM)对同一文件的并发访问。

FileLock是java 1.4 版本后出现的一个类,它可以通过对一个可写文件(w)加锁,保证同时只有一个进程可以拿到文件的锁,这个进程从而可以对文件做访问;而其它拿不到锁的进程要么选择被挂起等待,要么选择去做一些其它的事情,这样的机制保证了众进程可以顺序访问该文件。

可以看出,能够利用文件锁的这种性质,在一些场景下,虽然我们不需要向文件中写入数据,不受其他程序的打扰,文件锁就很合适。

java.nio.channels.FileChannel,FileChannel是NIO中的一个类。

独占锁与共享锁

  • 独占锁:也称排它锁,如果一个线程获得一个文件的独占锁,那么其它线程就不能再获得同一文件的独占锁或共享锁,直到独占锁被释放。
  • 共享锁:如果一个线程获得一个文件的共享锁,那么其它线程可以获得同一文件的共享锁或同一文件部分内容的共享锁,但不能获取排它锁

当a.txt文件被加独占锁时 其他线程不可读也不可写。
当a.txt文件被加共享锁时 其他线程可读也不可写。

fc.tryLock(position,size,isShare);第三个参数为true时 为共享锁。

所以一个进程中的线程获得了文件锁,希望进程内其他线程可以做读操作时,可以使用共享锁。写操作是不被支持的。

使用tryLock来获取锁

  • lock()阻塞的方法,锁定范围可以随着文件的增大而增加。
  • tryLock()非阻塞,当未获得锁时,返回null。

FileLock的生命周期:调用FileLock.release()或者Channel.close(),或者JVM关闭。
boolean java.nio.channels.FileLock.overlaps(long position, long size),true 表示当前锁在区域内,false表示当前锁的区域与参数区域不重叠。

文件锁的效果是与操作系统相关的,是由操作系统底层来实现的。比如,在windows下,进程间不能同时读写一个文件,而在Linux下,不同的进程可以同时读写一个文件。

总体来说需要注意一些细节:

  • 如果同为写进程,可以都使用OutoutStream;
  • 如果为一读一写,使用InputStream和OutoutStream无法达到加锁的效果。改为使用RandomAccessFile后,读写进程还必须使用相同的模式(此例中为rw)才可以成功加锁。
  • 另外,lock()方法和tryLock()的区别就是lock()是阻塞的,tryLock()是非阻塞的。

一个更重要的细节

java提供的FileLock是使用在多个jvm之间,核心就是调用Linux的fnctl来从内核对文件进行加锁,这个锁,只对跨JVM生效,对同一个JVM内不同线程是不起作用的。
如果在同一个jvm内,多线程的情况下,当第一个线程拿到lock后(无论读写),第二个线程再去尝试拿lock的时候会直接抛 OverlappingFileLockException() ,
这就意味着我们需要针对同jvm多线程,多个jvm这2种情况,分别做不同的锁。

封装一下

如果我们参考线程安全Map,其实可以实现一个基于磁盘的线程的安全的Map,来屏蔽底层存储介质带来的影响,例如:

import io.proton.common.util.JsonCodec;
import io.proton.shaded.org.apache.commons.io.IOUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFileAttributes;
import java.nio.file.attribute.PosixFilePermission;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

public class DiskConcurrentHashMap<K,V> implements ConcurrentMap<K,V> {
private static final Logger LOG = LoggerFactory.getLogger(DiskConcurrentHashMap.class);
private static final JsonCodec<Map> CODEC = new JsonCodec<>(Map.class);
private static final ReentrantReadWriteLock threadLock = new ReentrantReadWriteLock();
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
private final transient String storageFile;

/**
* The default initial table capacity. Must be a power of 2
* (i.e., at least 1) and at most MAXIMUM_CAPACITY.
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* The data will be serialized and stored in this directory,
* * and the permissions for this directory are readable and writable by all users.
*/
private static final String DEFAULT_LOCAL_PAT = "/tmp/";

/**
* Creates a new, empty map with an initial table size based on the given number of elements.
* By default, data will be saved on disk, and consistency will be guaranteed through a global file lock.
* When updating the Map, a file write lock will be obtained, which is an exclusive lock that only allows one thread to operate.
* When reading from the Map, a file read lock will be obtained, which is a shared lock that allows multiple threads to read simultaneously.
* @param initialCapacity the initial capacity. The implementation performs internal sizing to accommodate this many elements, given the specified load factor.
* @param storageFile the local cache file, The default directory for this file is /tmp.
* When setting it to another directory path, ensure that the file has common read and write permissions if it needs to be shared between multiple JVM processes.
* For example, set the file to 777 permissions.
*/
public DiskConcurrentHashMap(int initialCapacity, String storageFile) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
this.storageFile = isAbsolutePath(storageFile) ? storageFile : String.format("%s/%s", DEFAULT_LOCAL_PAT, storageFile);
}

public DiskConcurrentHashMap(String storageFile) {
this(DEFAULT_CAPACITY, storageFile);
}

@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
if (key == null || mappingFunction == null) {
throw new NullPointerException();
}

V existV = get(key);
if (existV != null) {
return existV;
}

try (FileOutputStream out = new FileOutputStream(storageFile);
FileChannel channel = FileChannel.open(
Paths.get(storageFile), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
FileLock lock = channel.tryLock();
threadLock.writeLock().lock();
while (lock == null) {
lock = channel.tryLock();
}
Map<K, V> existData = loadFromDiskWithLock();
existV = existData.get(key);
if (existV != null) {
return existV;
}

if (existData.size() > sizeCtl) {
throw new IllegalArgumentException(
String.format(
"The current capacity has reached the maximum capacity. Please delete some data before trying to add more. " +
"current capacity: %s, maximum capacity: %s ",
sizeCtl,
existData.size()
)
);
}

V val = mappingFunction.apply(key);
existData.put(key, val);
IOUtils.copy(new ByteArrayInputStream(CODEC.toBytes(existData)), out);
Set<PosixFilePermission> permissions =
Files.readAttributes(Paths.get(storageFile), PosixFileAttributes.class).permissions();
permissions.addAll(readWritePermissions());
Files.setPosixFilePermissions(Paths.get(storageFile), permissions);
LOG.info("Put a new key into the map successfully, release the write lock.");
lock.release();
return val;
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
threadLock.writeLock().unlock();
}
}

@Override
public int size() {
return loadFromDiskWithLock().size();
}

@Override
public boolean isEmpty() {
return loadFromDiskWithLock().isEmpty();
}

@Override
public boolean containsKey(Object key) {
return loadFromDiskWithLock().containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return loadFromDiskWithLock().containsKey(value);
}

@Override
public V get(Object key) {
return loadFromDiskWithLock().get(key);
}

@Nullable
@Override
public V put(K key, V value) {
throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute.");
}

@Override
public V remove(Object key) {
V existV = get(key);
if (existV == null) {
return null;
}

try (FileOutputStream out = new FileOutputStream(storageFile);
FileChannel channel = FileChannel.open(
Paths.get(storageFile), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
FileLock lock = channel.lock();
Map<K, V> existData = loadFromDiskWithLock();
existV = existData.get(key);
if (existV == null) {
return null;
}
existV = existData.remove(key);
IOUtils.copy(new ByteArrayInputStream(CODEC.toBytes(existData)), out);
lock.release();
return existV;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void putAll(@NotNull Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute.");
}

@Override
public void clear() {
File file = new File(storageFile);
if (file.exists()) {
file.delete();
}
}

@NotNull
@Override
public Set<K> keySet() {
return loadFromDiskWithLock().keySet();
}

@NotNull
@Override
public Collection<V> values() {
return loadFromDiskWithLock().values();
}

@NotNull
@Override
public Set<Entry<K, V>> entrySet() {
return loadFromDiskWithLock().entrySet();
}

@Override
public V putIfAbsent(@NotNull K key, V value) {
throw new UnsupportedOperationException("This method is not currently supported. Please use `computeIfAbsent` as a substitute.");
}

@Override
public boolean remove(@NotNull Object key, Object value) {
throw new UnsupportedOperationException("Not supported");
}

@Override
public boolean replace(@NotNull K key, @NotNull V oldValue, @NotNull V newValue) {
throw new UnsupportedOperationException("Not supported");
}

@Override
public V replace(@NotNull K key, @NotNull V value) {
throw new UnsupportedOperationException("Not supported");
}

/**
* Returns a power of two table size for the given desired capacity.
* See Hackers Delight, sec 3.2
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

/**
* Determine whether the path is an absolute path. On Linux systems, absolute paths begin with "/".
* On Windows systems, absolute paths are determined by whether the path contains a ":" character.
* In Windows systems, absolute paths require the use of a colon (:). e.g. c://path.
* @param path storage path.
* @return is absolute path.
*/
private boolean isAbsolutePath(String path) {
return path.startsWith("/") || path.indexOf(":") > 0;
}

private Map<K, V> loadFromDiskWithLock() {
try (FileInputStream in = new FileInputStream(storageFile);
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
FileLock lock = null;
threadLock.readLock().lock();
while (lock == null) {
lock = in.getChannel().tryLock(0, new File(storageFile).length(), true);
}
IOUtils.copy(in, out);
lock.release();
return CODEC.fromBytes(out.toByteArray());
} catch (IOException e) {
// throw new UncheckedIOException("Failed to serialize data from local file.", e);
LOG.error("Failed to serialize data from local file, returning default value.", e);
return new HashMap<>();
} finally {
threadLock.readLock().unlock();
}
}

private Map<K, V> loadFromDisk() {
try (FileInputStream in = new FileInputStream(storageFile);
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
IOUtils.copy(in, out);
return CODEC.fromBytes(out.toByteArray());
} catch (IOException e) {
LOG.error("Failed to serialize data from local file, returning default value.", e);
return new HashMap<>();
}
}

private Set<PosixFilePermission> readWritePermissions() {
Set<PosixFilePermission> perms = new HashSet<>();
perms.add(PosixFilePermission.OWNER_WRITE);
perms.add(PosixFilePermission.OWNER_READ);
perms.add(PosixFilePermission.GROUP_WRITE);
perms.add(PosixFilePermission.GROUP_READ);
perms.add(PosixFilePermission.OTHERS_WRITE);
perms.add(PosixFilePermission.OTHERS_READ);
return perms;
}
}

这种情况下使用FileLock作为跨进程的锁,使用ReentrantReadWriteLock 作为跨线程的锁,同时保证文件落盘的时候,整体的一致性。


扫码手机观看或分享: