Add a separate queue for non-interactive users

A new internal user group, "Non-Interactive Users" is added. members
of this group are not expected to perform interactive operations on
the gerrit web frontend.

However, sometimes such a user may need a separate thread pool in
order to prevent it from grabbing threads from the interactive users.

This change introduces a second thread pool, which separates
operations from the non-interactive users from the interactive ones.
This ensures that the interactive users can keep working when
resources are tight.

Change-Id: I16334dd84ec50e1a6ca894e635c8beea9bd42115
This commit is contained in:
Nico Sallembien 2010-05-18 16:40:10 -07:00
parent 8ef3b69f35
commit fc53f7fd7a
12 changed files with 224 additions and 37 deletions

View File

@ -1447,6 +1447,23 @@ are queued and serviced in a first-come-first-serve order.
+
By default, 1.5x the number of CPUs available to the JVM.
[[sshd.batchThreads]]sshd.batchThreads::
+
Number of threads to allocate for SSH command requests from
non-interactive users. If equals to 0, then all non-interactive
requests are executed in the same queue as interactive requests.
+
Any other value will remove the number of threads from the queue
allocated to interactive users, and create a separate thread pool
of the requested size, which will be used to run commands from
non-interactive users.
+
If the number of threads requested for non-interactive users is larger
than the total number of threads allocated in sshd.threads, then the
value of sshd.threads is increased to accomodate the requested value.
+
By default, 0.
[[sshd.streamThreads]]sshd.streamThreads::
+
Number of threads to use when formatting events to asynchronous

View File

@ -15,6 +15,7 @@
package com.google.gerrit.pgm.http.jetty;
import static com.google.gerrit.server.config.ConfigUtil.getTimeUnit;
import static com.google.inject.Scopes.SINGLETON;
import static java.util.concurrent.TimeUnit.MINUTES;
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
@ -23,7 +24,8 @@ import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.CommandExecutor;
import com.google.gerrit.sshd.CommandExecutorQueueProvider;
import com.google.gerrit.sshd.QueueProvider;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@ -73,33 +75,28 @@ public class ProjectQoSFilter implements Filter {
private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE);
public static class Module extends ServletModule {
private final WorkQueue.Executor executor;
@Inject
Module(@CommandExecutor final WorkQueue.Executor executor) {
this.executor = executor;
}
@Override
protected void configureServlets() {
bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class)
.toInstance(executor);
bind(QueueProvider.class).to(CommandExecutorQueueProvider.class)
.in(SINGLETON);
filterRegex(FILTER_RE).through(ProjectQoSFilter.class);
}
}
private final Provider<CurrentUser> userProvider;
private final WorkQueue.Executor executor;
private final QueueProvider queue;
private final ServletContext context;
private final long maxWait;
@SuppressWarnings("unchecked")
@Inject
ProjectQoSFilter(final Provider<CurrentUser> userProvider,
@CommandExecutor final WorkQueue.Executor executor,
final ServletContext context, @GerritServerConfig final Config cfg) {
QueueProvider queue, final ServletContext context,
@GerritServerConfig final Config cfg) {
this.userProvider = userProvider;
this.executor = executor;
this.queue = queue;
this.context = context;
this.maxWait = getTimeUnit(cfg, "httpd", null, "maxwait", 5, MINUTES);
}
@ -111,6 +108,8 @@ public class ProjectQoSFilter implements Filter {
final HttpServletResponse rsp = (HttpServletResponse) response;
final Continuation cont = ContinuationSupport.getContinuation(req);
WorkQueue.Executor executor = getExecutor();
if (cont.isInitial()) {
TaskThunk task = new TaskThunk(cont, req);
if (maxWait > 0) {
@ -143,6 +142,16 @@ public class ProjectQoSFilter implements Filter {
}
}
private WorkQueue.Executor getExecutor() {
WorkQueue.Executor executor;
if (userProvider.get().isBatchUser()) {
executor = queue.getBatchQueue();
} else {
executor = queue.getInteractiveQueue();
}
return executor;
}
@Override
public void init(FilterConfig config) {
}
@ -210,6 +219,7 @@ public class ProjectQoSFilter implements Filter {
@Override
public void onTimeout(Continuation self) {
WorkQueue.Executor executor = getExecutor();
executor.remove(this);
}

View File

@ -14,6 +14,7 @@
package com.google.gerrit.reviewdb;
import com.google.gerrit.reviewdb.AccountGroup.Id;
import com.google.gwtorm.client.Column;
import com.google.gwtorm.client.StringKey;
@ -78,6 +79,10 @@ public final class SystemConfig {
@Column(id = 7)
public Project.NameKey wildProjectName;
/** Identity of the batch users group */
@Column(id = 8)
public AccountGroup.Id batchUsersGroupId;
protected SystemConfig() {
}
}

View File

@ -59,6 +59,11 @@ public abstract class CurrentUser {
/** Set of changes starred by this user. */
public abstract Set<Change.Id> getStarredChanges();
/** Is the user a non-interactive user? */
public boolean isBatchUser() {
return getEffectiveGroups().contains(authConfig.getBatchUsersGroup());
}
@Deprecated
public final boolean isAdministrator() {
return getEffectiveGroups().contains(authConfig.getAdministratorsGroup());

View File

@ -46,6 +46,7 @@ public class AuthConfig {
private final AccountGroup.Id administratorGroup;
private final Set<AccountGroup.Id> anonymousGroups;
private final Set<AccountGroup.Id> registeredGroups;
private final AccountGroup.Id batchUsersGroup;
private final boolean allowGoogleAccountUpgrade;
@ -65,6 +66,7 @@ public class AuthConfig {
registeredGroups = Collections.unmodifiableSet(r);
anonymousGroups = Collections.singleton(s.anonymousGroupId);
administratorGroup = s.adminGroupId;
batchUsersGroup = s.batchUsersGroupId;
if (authType == AuthType.OPENID) {
allowGoogleAccountUpgrade =
@ -117,6 +119,11 @@ public class AuthConfig {
return administratorGroup;
}
/** Identity of the group whose service is degraded to lower priority. */
public AccountGroup.Id getBatchUsersGroup() {
return batchUsersGroup;
}
/** Groups that all users, including anonymous users, belong to. */
public Set<AccountGroup.Id> getAnonymousGroups() {
return anonymousGroups;

View File

@ -140,11 +140,22 @@ public class SchemaCreator {
c.accountGroupNames().insert(
Collections.singleton(new AccountGroupName(registered)));
final AccountGroup batchUsers =
new AccountGroup(new AccountGroup.NameKey("Non-Interactive Users"),
new AccountGroup.Id(c.nextAccountGroupId()));
batchUsers.setDescription("Users who perform batch actions on Gerrit");
batchUsers.setOwnerGroupId(admin.getId());
batchUsers.setType(AccountGroup.Type.INTERNAL);
c.accountGroups().insert(Collections.singleton(batchUsers));
c.accountGroupNames().insert(
Collections.singleton(new AccountGroupName(batchUsers)));
final SystemConfig s = SystemConfig.create();
s.registerEmailPrivateKey = SignedToken.generateRandomKey();
s.adminGroupId = admin.getId();
s.anonymousGroupId = anonymous.getId();
s.registeredGroupId = registered.getId();
s.batchUsersGroupId = batchUsers.getId();
s.wildProjectName = DEFAULT_WILD_NAME;
try {
s.sitePath = site_path.getCanonicalPath();

View File

@ -32,7 +32,7 @@ import java.util.List;
/** A version of the database schema. */
public abstract class SchemaVersion {
/** The current schema version. */
private static final Class<? extends SchemaVersion> C = Schema_32.class;
private static final Class<? extends SchemaVersion> C = Schema_33.class;
public static class Module extends AbstractModule {
@Override

View File

@ -0,0 +1,49 @@
// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.schema;
import com.google.gerrit.reviewdb.AccountGroup;
import com.google.gerrit.reviewdb.AccountGroupName;
import com.google.gerrit.reviewdb.ReviewDb;
import com.google.gerrit.reviewdb.SystemConfig;
import com.google.gwtorm.client.OrmException;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.Collections;
public class Schema_33 extends SchemaVersion {
@Inject
Schema_33(Provider<Schema_32> prior) {
super(prior);
}
@Override
protected void migrateData(ReviewDb db, UpdateUI ui) throws OrmException {
SystemConfig config = db.systemConfig().all().toList().get(0);
final AccountGroup batchUsers =
new AccountGroup(new AccountGroup.NameKey("Non-Interactive Users"),
new AccountGroup.Id(db.nextAccountGroupId()));
batchUsers.setDescription("Users who perform batch actions on Gerrit");
batchUsers.setOwnerGroupId(config.adminGroupId);
batchUsers.setType(AccountGroup.Type.INTERNAL);
db.accountGroups().insert(Collections.singleton(batchUsers));
db.accountGroupNames().insert(
Collections.singleton(new AccountGroupName(batchUsers)));
config.batchUsersGroupId = batchUsers.getId();
db.systemConfig().update(Collections.singleton(config));
}
}

View File

@ -14,6 +14,7 @@
package com.google.gerrit.sshd;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
@ -24,32 +25,25 @@ import org.eclipse.jgit.lib.Config;
import java.util.concurrent.ThreadFactory;
class CommandExecutorProvider implements Provider<WorkQueue.Executor> {
private final int poolSize;
private final WorkQueue queues;
private final QueueProvider queues;
private final CurrentUser user;
@Inject
CommandExecutorProvider(@GerritServerConfig final Config config,
final WorkQueue wq) {
final int cores = Runtime.getRuntime().availableProcessors();
poolSize = config.getInt("sshd", "threads", 3 * cores / 2);
queues = wq;
CommandExecutorProvider(QueueProvider queues,
CurrentUser user) {
this.queues = queues;
this.user = user;
}
@Override
public WorkQueue.Executor get() {
final WorkQueue.Executor executor;
executor = queues.createQueue(poolSize, "SSH-Worker");
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(final Runnable task) {
final Thread t = parent.newThread(task);
t.setPriority(Thread.MIN_PRIORITY);
return t;
}
});
WorkQueue.Executor executor;
if (user.isBatchUser()) {
executor = queues.getBatchQueue();
} else {
executor = queues.getInteractiveQueue();
}
return executor;
}
}

View File

@ -0,0 +1,76 @@
// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.sshd;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import org.eclipse.jgit.lib.Config;
import java.util.concurrent.ThreadFactory;
public class CommandExecutorQueueProvider implements QueueProvider {
private int poolSize;
private final int batchThreads;
private final WorkQueue.Executor interactiveExecutor;
private final WorkQueue.Executor batchExecutor;
@Inject
public CommandExecutorQueueProvider(@GerritServerConfig final Config config,
final WorkQueue queues) {
final int cores = Runtime.getRuntime().availableProcessors();
poolSize = config.getInt("sshd", "threads", 3 * cores / 2);
batchThreads = config.getInt("sshd", "batchThreads", 0);
if (batchThreads > poolSize) {
poolSize += batchThreads;
}
int interactiveThreads = Math.max(1, poolSize - batchThreads);
interactiveExecutor = queues.createQueue(interactiveThreads,
"SSH-Interactive-Worker");
if (batchThreads != 0) {
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker");
setThreadFactory(batchExecutor);
} else {
batchExecutor = interactiveExecutor;
}
setThreadFactory(interactiveExecutor);
}
private void setThreadFactory(WorkQueue.Executor executor) {
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(final Runnable task) {
final Thread t = parent.newThread(task);
t.setPriority(Thread.MIN_PRIORITY);
return t;
}
});
}
@Override
public WorkQueue.Executor getInteractiveQueue() {
return interactiveExecutor;
}
@Override
public WorkQueue.Executor getBatchQueue() {
return batchExecutor;
}
}

View File

@ -0,0 +1,11 @@
package com.google.gerrit.sshd;
import com.google.gerrit.server.git.WorkQueue;
public interface QueueProvider {
public WorkQueue.Executor getInteractiveQueue();
public WorkQueue.Executor getBatchQueue();
}

View File

@ -40,6 +40,7 @@ import com.google.gerrit.util.cli.CmdLineParser;
import com.google.gerrit.util.cli.OptionHandlerFactory;
import com.google.gerrit.util.cli.OptionHandlerUtil;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryProvider;
import com.google.inject.servlet.RequestScoped;
@ -72,11 +73,9 @@ public class SshModule extends FactoryModule {
.toInstance(new DispatchCommandProvider("", Commands.CMD_ROOT));
bind(CommandFactoryProvider.class);
bind(CommandFactory.class).toProvider(CommandFactoryProvider.class);
bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class)
.toProvider(CommandExecutorProvider.class).in(SINGLETON);
bind(WorkQueue.Executor.class).annotatedWith(StreamCommandExecutor.class)
.toProvider(StreamCommandExecutorProvider.class).in(SINGLETON);
bind(QueueProvider.class).to(CommandExecutorQueueProvider.class).in(SINGLETON);
bind(PublickeyAuthenticator.class).to(DatabasePubKeyAuth.class);
bind(PasswordAuthenticator.class).to(DatabasePasswordAuth.class);
@ -107,6 +106,9 @@ public class SshModule extends FactoryModule {
bind(IdentifiedUser.class).toProvider(SshIdentifiedUserProvider.class).in(
SshScope.REQUEST);
bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class)
.toProvider(CommandExecutorProvider.class).in(SshScope.REQUEST);
install(new GerritRequestModule());
}