Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage 1, Андрей Чешев, Политех #16

Merged
merged 18 commits into from
Feb 29, 2024
136 changes: 136 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/ServerImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package ru.vk.itmo.test.andreycheshev;

import one.nio.http.HttpServer;
import one.nio.http.HttpServerConfig;
import one.nio.http.HttpSession;
import one.nio.http.Request;
import one.nio.http.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.andreycheshev.dao.PersistentReferenceDao;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;

import static one.nio.http.Request.METHOD_DELETE;
import static one.nio.http.Request.METHOD_GET;
import static one.nio.http.Request.METHOD_PUT;
import static one.nio.http.Response.ACCEPTED;
import static one.nio.http.Response.BAD_REQUEST;
import static one.nio.http.Response.CREATED;
import static one.nio.http.Response.INTERNAL_ERROR;
import static one.nio.http.Response.METHOD_NOT_ALLOWED;
import static one.nio.http.Response.NOT_FOUND;
import static one.nio.http.Response.OK;

public class ServerImpl extends HttpServer {
private static final Logger logger = LoggerFactory.getLogger(ServerImpl.class);
private static final String REQUEST_PATH = "/v0/entity";
private static final String ID = "id=";

private final PersistentReferenceDao dao;

public ServerImpl(HttpServerConfig config, Config daoConfig) throws IOException {
super(config);

this.dao = new PersistentReferenceDao(daoConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dao, в принципе, тоже можно было бы инстанцировать в сервисе и в сервер просто инжектить

}

private Response get(final Request request) {
String id = request.getParameter(ID);
if (id == null || id.isEmpty()) {
return new Response(BAD_REQUEST, Response.EMPTY);
}

Entry<MemorySegment> entry = dao.get(fromString(id));

return (entry == null)
? new Response(NOT_FOUND, Response.EMPTY)
: new Response(OK, entry.value().toArray(ValueLayout.JAVA_BYTE));
}

private Response put(final Request request) {
String id = request.getParameter(ID);
if (id == null || id.isEmpty()) {
return new Response(BAD_REQUEST, Response.EMPTY);
}

Entry<MemorySegment> entry = new BaseEntry<>(
fromString(id),
MemorySegment.ofArray(request.getBody())
);
dao.upsert(entry);

return new Response(CREATED, Response.EMPTY);
}

private Response delete(final Request request) {
String id = request.getParameter(ID);
if (id == null || id.isEmpty()) {
return new Response(BAD_REQUEST, Response.EMPTY);
}

Entry<MemorySegment> entry = new BaseEntry<>(fromString(id), null);
dao.upsert(entry);

return new Response(ACCEPTED, Response.EMPTY);
}

@Override
public void handleRequest(Request request, HttpSession session) {
String path = request.getPath();
if (!path.equals(REQUEST_PATH)) {
sendResponse(
new Response(BAD_REQUEST, Response.EMPTY),
session
);
return;
}

int method = request.getMethod();

Response response;
try {
response = switch (method) {
case METHOD_GET -> get(request);
case METHOD_PUT -> put(request);
case METHOD_DELETE -> delete(request);
default -> new Response(METHOD_NOT_ALLOWED, Response.EMPTY);
};
} catch (Exception e) {
logger.error("Internal error of the DAO operation", e);
sendResponse(new Response(INTERNAL_ERROR, Response.EMPTY), session);
return;
}

sendResponse(response, session);
}

private void sendResponse(Response response, HttpSession session) {
try {
session.sendResponse(response);
} catch (IOException e) {
throw new UncheckedIOException("Error while sending response to the client", e);
}
}

private MemorySegment fromString(String data) {
return MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8));
}

@Override
public synchronized void stop() {
super.stop();
try {
dao.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
33 changes: 33 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/ServerStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ru.vk.itmo.test.andreycheshev;

import one.nio.http.HttpServerConfig;
import one.nio.server.AcceptorConfig;
import ru.vk.itmo.dao.Config;

import java.io.IOException;
import java.nio.file.Path;

public final class ServerStarter {
private static final Path DIT_PATH = Path.of("/home/andrey/andrey/tmp");
private static final int THRESHOLD_BYTES = 100000;
private static final int PORT = 8080;

private ServerStarter() {

}

public static void main(String[] args) throws IOException {
AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = PORT;
acceptorConfig.reusePort = true;
HttpServerConfig serverConfig = new HttpServerConfig();
serverConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
serverConfig.closeSessions = true;

Config daoConfig = new Config(DIT_PATH, THRESHOLD_BYTES);

ServerImpl server = new ServerImpl(serverConfig, daoConfig);

server.start();
}
}
65 changes: 65 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/ServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package ru.vk.itmo.test.andreycheshev;

import one.nio.http.HttpServerConfig;
import one.nio.server.AcceptorConfig;
import one.nio.server.Server;
import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.ServiceFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.CompletableFuture;

public class ServiceImpl implements Service {
private static final int THRESHOLD_BYTES = 100000;

private final HttpServerConfig serverConfig;
private final Config daoConfig;

private Server server;

public ServiceImpl(ServiceConfig config) {
this.serverConfig = createServerConfig(config);
this.daoConfig = new Config(config.workingDir(), THRESHOLD_BYTES);
}

private HttpServerConfig createServerConfig(ServiceConfig config) {
AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = config.selfPort();
acceptorConfig.reusePort = true;

HttpServerConfig newServerConfig = new HttpServerConfig();
newServerConfig.acceptors = new AcceptorConfig[]{acceptorConfig};
newServerConfig.closeSessions = true;

return newServerConfig;
}

@Override
public CompletableFuture<Void> start() throws IOException {
try {
server = new ServerImpl(serverConfig, daoConfig);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
server.start();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stop() throws IOException {
server.stop();
return CompletableFuture.completedFuture(null);
}

@ServiceFactory(stage = 1)
public static class Factory implements ServiceFactory.Factory {

@Override
public Service create(ServiceConfig config) {
return new ServiceImpl(config);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.vk.itmo.test.andreycheshev.dao;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

/**
* Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface.
*
* @author incubos
*/
final class ByteArraySegment {
private byte[] array;
private MemorySegment segment;

ByteArraySegment(final int capacity) {
this.array = new byte[capacity];
this.segment = MemorySegment.ofArray(array);
}

void withArray(final ArrayConsumer consumer) throws IOException {
consumer.process(array);
}

MemorySegment segment() {
return segment;
}

void ensureCapacity(final long size) {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too big!");
}

final int capacity = (int) size;
if (array.length >= capacity) {
return;
}

// Grow to the nearest bigger power of 2
final int newSize = Integer.highestOneBit(capacity) << 1;
array = new byte[newSize];
segment = MemorySegment.ofArray(array);
}

interface ArrayConsumer {
void process(byte[] array) throws IOException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ru.vk.itmo.test.andreycheshev.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Filters non tombstone {@link Entry}s.
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
}
}
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
next = null;

skipTombstones();

return result;
}
}
49 changes: 49 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/dao/MemTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ru.vk.itmo.test.andreycheshev.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Memory table.
*
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
// All
return map.values().iterator();
} else if (from == null) {
// Head
return map.headMap(to).values().iterator();
} else if (to == null) {
// Tail
return map.tailMap(from).values().iterator();
} else {
// Slice
return map.subMap(from, to).values().iterator();
}
}

Entry<MemorySegment> get(final MemorySegment key) {
return map.get(key);
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
return map.put(entry.key(), entry);
}
}
Loading
Loading